// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Linq; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Text; using Microsoft.Reactive.Testing; using ReactiveTests.Dummies; using Microsoft.VisualStudio.TestTools.UnitTesting; using Assert = Xunit.Assert; namespace ReactiveTests.Tests { [TestClass] public class GroupByUntilTest : ReactiveTest { #region + GroupByUntil + [TestMethod] public void GroupByUntil_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.GroupByUntil(default, DummyFunc.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, default, DummyFunc.Instance, DummyFunc, IObservable>.Instance, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, default, DummyFunc, IObservable>.Instance, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc.Instance, default(Func, IObservable>), EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, default(IEqualityComparer))); ReactiveAssert.Throws(() => Observable.GroupByUntil(default, DummyFunc.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, default, DummyFunc.Instance, DummyFunc, IObservable>.Instance)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, default, DummyFunc, IObservable>.Instance)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc.Instance, default(Func, IObservable>))); ReactiveAssert.Throws(() => Observable.GroupByUntil(default, DummyFunc.Instance, DummyFunc, IObservable>.Instance, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, default, DummyFunc, IObservable>.Instance, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, default(Func, IObservable>), EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, default(IEqualityComparer))); ReactiveAssert.Throws(() => Observable.GroupByUntil(default, DummyFunc.Instance, DummyFunc, IObservable>.Instance)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, default, DummyFunc, IObservable>.Instance)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, default(Func, IObservable>))); } [TestMethod] public void GroupByUntil_WithKeyComparer() { var scheduler = new TestScheduler(); var keyInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, g => g.Skip(2), comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); Assert.Equal(12, keyInvoked); } [TestMethod] public void GroupByUntil_Outer_Complete() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); Assert.Equal(12, keyInvoked); Assert.Equal(12, eleInvoked); } [TestMethod] public void GroupByUntil_Outer_Error() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnError(570, ex), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnError(570, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); Assert.Equal(12, keyInvoked); Assert.Equal(12, eleInvoked); } [TestMethod] public void GroupByUntil_Outer_Dispose() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), comparer ).Select(g => g.Key), 355 ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz") ); xs.Subscriptions.AssertEqual( Subscribe(200, 355) ); Assert.Equal(5, keyInvoked); Assert.Equal(5, eleInvoked); } [TestMethod] public void GroupByUntil_Outer_KeyThrow() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; if (keyInvoked == 10) { throw ex; } return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnError(480, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 480) ); Assert.Equal(10, keyInvoked); Assert.Equal(9, eleInvoked); } [TestMethod] public void GroupByUntil_Outer_EleThrow() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; if (eleInvoked == 10) { throw ex; } return Reverse(x); }, g => g.Skip(2), comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnError(480, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 480) ); Assert.Equal(10, keyInvoked); Assert.Equal(10, eleInvoked); } [TestMethod] public void GroupByUntil_Outer_ComparerEqualsThrow() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnError(310, comparer.EqualsException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 310) ); Assert.Equal(4, keyInvoked); Assert.Equal(3, eleInvoked); } [TestMethod] public void GroupByUntil_Outer_ComparerGetHashCodeThrow() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnError(420, comparer.HashCodeException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 420) ); Assert.Equal(8, keyInvoked); Assert.Equal(7, eleInvoked); } [TestMethod] public void GroupByUntil_Inner_Complete() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result)); })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnCompleted(320) ); res["baR"].Messages.AssertEqual( OnNext(390, "rab "), OnNext(420, " RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through OnCompleted(420) ); res["Baz"].Messages.AssertEqual( OnNext(480, " zab"), OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through OnCompleted(510) ); res["qux"].Messages.AssertEqual( OnCompleted(570) ); res["FOO"].Messages.AssertEqual( OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Inner_Complete_All() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnNext(420, " RAB "), OnCompleted(420) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnNext(480, " zab"), OnNext(510, " ZAb "), OnCompleted(510) ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq "), OnCompleted(570) ); res["FOO"].Messages.AssertEqual( OnNext(470, " OOF"), OnNext(530, " oOf "), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Inner_Error() { var scheduler = new TestScheduler(); var ex1 = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnError(570, ex1), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result)); }, ex => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnCompleted(320) ); res["baR"].Messages.AssertEqual( OnNext(390, "rab "), OnNext(420, " RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through OnCompleted(420) ); res["Baz"].Messages.AssertEqual( OnNext(480, " zab"), OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through OnCompleted(510) ); res["qux"].Messages.AssertEqual( OnError(570, ex1) ); res["FOO"].Messages.AssertEqual( OnError(570, ex1) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Inner_Dispose() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); })); scheduler.ScheduleAbsolute(400, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(4, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab ") ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB ") ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq ") ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void GroupByUntil_Inner_KeyThrow() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var keyInvoked = 0; var ex = new Exception(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => { keyInvoked++; if (keyInvoked == 6) { throw ex; } return x.Trim(); }, x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, _ => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(3, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnError(360, ex) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnError(360, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 360) ); } [TestMethod] public void GroupByUntil_Inner_EleThrow() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var eleInvoked = 0; var ex = new Exception(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => { eleInvoked++; if (eleInvoked == 6) { throw ex; } return Reverse(x); }, g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, _ => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(4, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnError(360, ex) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnError(360, ex) ); res["qux"].Messages.AssertEqual( OnError(360, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 360) ); } [TestMethod] public void GroupByUntil_Inner_Comparer_EqualsThrow() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, _ => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(4, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnError(420, comparer.EqualsException) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnError(420, comparer.EqualsException) ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq "), OnError(420, comparer.EqualsException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 420) ); } [TestMethod] public void GroupByUntil_Inner_Comparer_GetHashCodeThrow() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, _ => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(4, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnError(420, comparer.HashCodeException) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnError(420, comparer.HashCodeException) ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq "), OnError(420, comparer.HashCodeException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 420) ); } [TestMethod] public void GroupByUntil_Outer_Independence() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var outerResults = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { outerResults.OnNext(group.Key); var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, outerResults.OnError, outerResults.OnCompleted)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose()); scheduler.Start(); Assert.Equal(2, inners.Count); outerResults.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR") ); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnNext(420, " RAB "), OnCompleted(420) ); xs.Subscriptions.AssertEqual( Subscribe(200, 420) ); } [TestMethod] public void GroupByUntil_Inner_Independence() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var outerResults = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { outerResults.OnNext(group.Key); var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, outerResults.OnError, outerResults.OnCompleted)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose()); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnNext(420, " RAB "), OnCompleted(420) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnNext(480, " zab"), OnNext(510, " ZAb "), OnCompleted(510) ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq "), OnCompleted(570) ); res["FOO"].Messages.AssertEqual( OnNext(470, " OOF"), OnNext(530, " oOf "), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Inner_Multiple_Independence() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var outerResults = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { outerResults.OnNext(group.Key); var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, outerResults.OnError, outerResults.OnCompleted)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose()); scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose()); scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose()); scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose()); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab") ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB ") ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq ") ); res["FOO"].Messages.AssertEqual( OnNext(470, " OOF"), OnNext(530, " oOf "), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Inner_Escape_Complete() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(310, "foO "), OnNext(470, "FOO "), OnNext(530, " fOo "), OnCompleted(570) ); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inner = default(IObservable); var innerSubscription = default(IDisposable); var res = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2))); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { inner = group; })); scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); innerSubscription.Dispose(); }); scheduler.Start(); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); res.Messages.AssertEqual( OnCompleted(600) ); } [TestMethod] public void GroupByUntil_Inner_Escape_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(310, "foO "), OnNext(470, "FOO "), OnNext(530, " fOo "), OnError(570, ex) ); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inner = default(IObservable); var innerSubscription = default(IDisposable); var res = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2))); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { inner = group; }, _ => { })); scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); innerSubscription.Dispose(); }); scheduler.Start(); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); res.Messages.AssertEqual( OnError(600, ex) ); } [TestMethod] public void GroupByUntil_Inner_Escape_Dispose() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(310, "foO "), OnNext(470, "FOO "), OnNext(530, " fOo "), OnError(570, new Exception()) ); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inner = default(IObservable); var innerSubscription = default(IDisposable); var res = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2))); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { inner = group; })); scheduler.ScheduleAbsolute(290, () => outerSubscription.Dispose()); scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); scheduler.ScheduleAbsolute(Disposed, () => { innerSubscription.Dispose(); }); scheduler.Start(); xs.Subscriptions.AssertEqual( Subscribe(200, 290) ); res.Messages.AssertEqual( ); } [TestMethod] public void GroupByUntil_Default() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim().ToLower(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2) ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "bar"), OnNext(350, "baz"), OnNext(360, "qux"), OnNext(470, "foo"), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); Assert.Equal(12, keyInvoked); Assert.Equal(12, eleInvoked); } [TestMethod] public void GroupByUntil_DurationSelector_Throws() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, "foo") ); var ex = new Exception(); var res = scheduler.Start(() => xs.GroupByUntil(x => x, g => { throw ex; }) ); res.Messages.AssertEqual( OnError>(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void GroupByUntil_NullKeys_Simple_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, "bar"), OnNext(240, "foo"), OnNext(310, "qux"), OnNext(470, "baz"), OnCompleted(500) ); var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never()).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); res.Messages.AssertEqual( OnNext(220, "(null)bar"), OnNext(240, "FOOfoo"), OnNext(310, "QUXqux"), OnNext(470, "(null)baz"), OnCompleted(500) ); xs.Subscriptions.AssertEqual( Subscribe(200, 500) ); } [TestMethod] public void GroupByUntil_NullKeys_Simple_Expire1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, "bar"), OnNext(240, "foo"), OnNext(310, "qux"), OnNext(470, "baz"), OnCompleted(500) ); var n = 0; var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) { n++; } return Observable.Timer(TimeSpan.FromTicks(50), scheduler); }).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); Assert.Equal(2, n); res.Messages.AssertEqual( OnNext(220, "(null)bar"), OnNext(240, "FOOfoo"), OnNext(310, "QUXqux"), OnNext(470, "(null)baz"), OnCompleted(500) ); xs.Subscriptions.AssertEqual( Subscribe(200, 500) ); } [TestMethod] public void GroupByUntil_NullKeys_Simple_Expire2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, "bar"), OnNext(240, "foo"), OnNext(310, "qux"), OnNext(470, "baz"), OnCompleted(500) ); var n = 0; var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) { n++; } return Observable.Timer(TimeSpan.FromTicks(50), scheduler).IgnoreElements(); }).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); Assert.Equal(2, n); res.Messages.AssertEqual( OnNext(220, "(null)bar"), OnNext(240, "FOOfoo"), OnNext(310, "QUXqux"), OnNext(470, "(null)baz"), OnCompleted(500) ); xs.Subscriptions.AssertEqual( Subscribe(200, 500) ); } [TestMethod] public void GroupByUntil_NullKeys_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(220, "bar"), OnNext(240, "foo"), OnNext(310, "qux"), OnNext(470, "baz"), OnError(500, ex) ); var nullGroup = scheduler.CreateObserver(); var err = default(Exception); scheduler.ScheduleAbsolute(200, () => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never()).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_)); scheduler.Start(); Assert.Same(ex, err); nullGroup.Messages.AssertEqual( OnNext(220, "bar"), OnNext(470, "baz"), OnError(500, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 500) ); } #endregion #region + GroupByUntil w/capacity + private const int _groupByUntilCapacity = 1024; [TestMethod] public void GroupByUntil_Capacity_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.GroupByUntil(default, DummyFunc.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, _groupByUntilCapacity, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, default, DummyFunc.Instance, DummyFunc, IObservable>.Instance, _groupByUntilCapacity, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, default, DummyFunc, IObservable>.Instance, _groupByUntilCapacity, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc.Instance, default(Func, IObservable>), _groupByUntilCapacity, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, _groupByUntilCapacity, default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(default, DummyFunc.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, _groupByUntilCapacity)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, default, DummyFunc.Instance, DummyFunc, IObservable>.Instance, _groupByUntilCapacity)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, default, DummyFunc, IObservable>.Instance, _groupByUntilCapacity)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc.Instance, default(Func, IObservable>), _groupByUntilCapacity)); ReactiveAssert.Throws(() => Observable.GroupByUntil(default, DummyFunc.Instance, DummyFunc, IObservable>.Instance, _groupByUntilCapacity, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, default, DummyFunc, IObservable>.Instance, _groupByUntilCapacity, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, default(Func, IObservable>), _groupByUntilCapacity, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, _groupByUntilCapacity, default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(default, DummyFunc.Instance, DummyFunc, IObservable>.Instance, _groupByUntilCapacity)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, default, DummyFunc, IObservable>.Instance, _groupByUntilCapacity)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, default(Func, IObservable>), _groupByUntilCapacity)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, -1, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, -1)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, -1, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.GroupByUntil(DummyObservable.Instance, DummyFunc.Instance, DummyFunc, IObservable>.Instance, -1)); } [TestMethod] public void GroupByUntil_Capacity_WithKeyComparer() { var scheduler = new TestScheduler(); var keyInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, g => g.Skip(2), _groupByUntilCapacity, comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); Assert.Equal(12, keyInvoked); } [TestMethod] public void GroupByUntil_Capacity_Outer_Complete() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), _groupByUntilCapacity, comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); Assert.Equal(12, keyInvoked); Assert.Equal(12, eleInvoked); } [TestMethod] public void GroupByUntil_Capacity_Outer_Error() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnError(570, ex), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), _groupByUntilCapacity, comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnError(570, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); Assert.Equal(12, keyInvoked); Assert.Equal(12, eleInvoked); } [TestMethod] public void GroupByUntil_Capacity_Outer_Dispose() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), _groupByUntilCapacity, comparer ).Select(g => g.Key), 355 ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz") ); xs.Subscriptions.AssertEqual( Subscribe(200, 355) ); Assert.Equal(5, keyInvoked); Assert.Equal(5, eleInvoked); } [TestMethod] public void GroupByUntil_Capacity_Outer_KeyThrow() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; if (keyInvoked == 10) { throw ex; } return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), _groupByUntilCapacity, comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnError(480, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 480) ); Assert.Equal(10, keyInvoked); Assert.Equal(9, eleInvoked); } [TestMethod] public void GroupByUntil_Capacity_Outer_EleThrow() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; if (eleInvoked == 10) { throw ex; } return Reverse(x); }, g => g.Skip(2), _groupByUntilCapacity, comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnNext(470, "FOO"), OnError(480, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 480) ); Assert.Equal(10, keyInvoked); Assert.Equal(10, eleInvoked); } [TestMethod] public void GroupByUntil_Capacity_Outer_ComparerEqualsThrow() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), _groupByUntilCapacity, comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnError(310, comparer.EqualsException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 310) ); Assert.Equal(4, keyInvoked); Assert.Equal(3, eleInvoked); } [TestMethod] public void GroupByUntil_Capacity_Outer_ComparerGetHashCodeThrow() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), _groupByUntilCapacity, comparer ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR"), OnNext(350, "Baz"), OnNext(360, "qux"), OnError(420, comparer.HashCodeException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 420) ); Assert.Equal(8, keyInvoked); Assert.Equal(7, eleInvoked); } [TestMethod] public void GroupByUntil_Capacity_Inner_Complete() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result)); })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnCompleted(320) ); res["baR"].Messages.AssertEqual( OnNext(390, "rab "), OnNext(420, " RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through OnCompleted(420) ); res["Baz"].Messages.AssertEqual( OnNext(480, " zab"), OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through OnCompleted(510) ); res["qux"].Messages.AssertEqual( OnCompleted(570) ); res["FOO"].Messages.AssertEqual( OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Complete_All() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnNext(420, " RAB "), OnCompleted(420) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnNext(480, " zab"), OnNext(510, " ZAb "), OnCompleted(510) ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq "), OnCompleted(570) ); res["FOO"].Messages.AssertEqual( OnNext(470, " OOF"), OnNext(530, " oOf "), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Error() { var scheduler = new TestScheduler(); var ex1 = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnError(570, ex1), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result)); }, ex => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnCompleted(320) ); res["baR"].Messages.AssertEqual( OnNext(390, "rab "), OnNext(420, " RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through OnCompleted(420) ); res["Baz"].Messages.AssertEqual( OnNext(480, " zab"), OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through OnCompleted(510) ); res["qux"].Messages.AssertEqual( OnError(570, ex1) ); res["FOO"].Messages.AssertEqual( OnError(570, ex1) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Dispose() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); })); scheduler.ScheduleAbsolute(400, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(4, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab ") ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB ") ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq ") ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_KeyThrow() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var keyInvoked = 0; var ex = new Exception(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => { keyInvoked++; if (keyInvoked == 6) { throw ex; } return x.Trim(); }, x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, _ => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(3, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnError(360, ex) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnError(360, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 360) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_EleThrow() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var eleInvoked = 0; var ex = new Exception(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => { eleInvoked++; if (eleInvoked == 6) { throw ex; } return Reverse(x); }, g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, _ => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(4, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnError(360, ex) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnError(360, ex) ); res["qux"].Messages.AssertEqual( OnError(360, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 360) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Comparer_EqualsThrow() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, _ => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(4, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnError(420, comparer.EqualsException) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnError(420, comparer.EqualsException) ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq "), OnError(420, comparer.EqualsException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 420) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Comparer_GetHashCodeThrow() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, _ => { })); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.Start(); Assert.Equal(4, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnError(420, comparer.HashCodeException) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnError(420, comparer.HashCodeException) ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq "), OnError(420, comparer.HashCodeException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 420) ); } [TestMethod] public void GroupByUntil_Capacity_Outer_Independence() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var outerResults = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { outerResults.OnNext(group.Key); var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, outerResults.OnError, outerResults.OnCompleted)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose()); scheduler.Start(); Assert.Equal(2, inners.Count); outerResults.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "baR") ); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnNext(420, " RAB "), OnCompleted(420) ); xs.Subscriptions.AssertEqual( Subscribe(200, 420) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Independence() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var outerResults = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { outerResults.OnNext(group.Key); var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, outerResults.OnError, outerResults.OnCompleted)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose()); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab"), OnNext(390, "rab "), OnNext(420, " RAB "), OnCompleted(420) ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB "), OnNext(480, " zab"), OnNext(510, " ZAb "), OnCompleted(510) ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq "), OnCompleted(570) ); res["FOO"].Messages.AssertEqual( OnNext(470, " OOF"), OnNext(530, " oOf "), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Multiple_Independence() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var comparer = new GroupByComparer(scheduler); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inners = new Dictionary>(); var innerSubscriptions = new Dictionary(); var res = new Dictionary>(); var outerResults = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { outerResults.OnNext(group.Key); var result = scheduler.CreateObserver(); inners[group.Key] = group; res[group.Key] = result; innerSubscriptions[group.Key] = group.Subscribe(result); }, outerResults.OnError, outerResults.OnCompleted)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions.Values) { d.Dispose(); } }); scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose()); scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose()); scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose()); scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose()); scheduler.Start(); Assert.Equal(5, inners.Count); res["foo"].Messages.AssertEqual( OnNext(220, "oof "), OnNext(240, " OoF "), OnNext(310, " Oof"), OnCompleted(310) ); res["baR"].Messages.AssertEqual( OnNext(270, " Rab") ); res["Baz"].Messages.AssertEqual( OnNext(350, " zaB ") ); res["qux"].Messages.AssertEqual( OnNext(360, " xuq ") ); res["FOO"].Messages.AssertEqual( OnNext(470, " OOF"), OnNext(530, " oOf "), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Escape_Complete() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(310, "foO "), OnNext(470, "FOO "), OnNext(530, " fOo "), OnCompleted(570) ); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inner = default(IObservable); var innerSubscription = default(IDisposable); var res = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { inner = group; })); scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); innerSubscription.Dispose(); }); scheduler.Start(); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); res.Messages.AssertEqual( OnCompleted(600) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Escape_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(310, "foO "), OnNext(470, "FOO "), OnNext(530, " fOo "), OnError(570, ex) ); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inner = default(IObservable); var innerSubscription = default(IDisposable); var res = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { inner = group; }, _ => { })); scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); scheduler.ScheduleAbsolute(Disposed, () => { outerSubscription.Dispose(); innerSubscription.Dispose(); }); scheduler.Start(); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); res.Messages.AssertEqual( OnError(600, ex) ); } [TestMethod] public void GroupByUntil_Capacity_Inner_Escape_Dispose() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(310, "foO "), OnNext(470, "FOO "), OnNext(530, " fOo "), OnError(570, new Exception()) ); var outer = default(IObservable>); var outerSubscription = default(IDisposable); var inner = default(IObservable); var innerSubscription = default(IDisposable); var res = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity)); scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => { inner = group; })); scheduler.ScheduleAbsolute(290, () => outerSubscription.Dispose()); scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); scheduler.ScheduleAbsolute(Disposed, () => { innerSubscription.Dispose(); }); scheduler.Start(); xs.Subscriptions.AssertEqual( Subscribe(200, 290) ); res.Messages.AssertEqual( ); } [TestMethod] public void GroupByUntil_Capacity_Default() { var scheduler = new TestScheduler(); var keyInvoked = 0; var eleInvoked = 0; var xs = scheduler.CreateHotObservable( OnNext(90, "error"), OnNext(110, "error"), OnNext(130, "error"), OnNext(220, " foo"), OnNext(240, " FoO "), OnNext(270, "baR "), OnNext(310, "foO "), OnNext(350, " Baz "), OnNext(360, " qux "), OnNext(390, " bar"), OnNext(420, " BAR "), OnNext(470, "FOO "), OnNext(480, "baz "), OnNext(510, " bAZ "), OnNext(530, " fOo "), OnCompleted(570), OnNext(580, "error"), OnCompleted(600), OnError(650, new Exception()) ); var res = scheduler.Start(() => xs.GroupByUntil( x => { keyInvoked++; return x.Trim().ToLower(); }, x => { eleInvoked++; return Reverse(x); }, g => g.Skip(2), _groupByUntilCapacity ).Select(g => g.Key) ); res.Messages.AssertEqual( OnNext(220, "foo"), OnNext(270, "bar"), OnNext(350, "baz"), OnNext(360, "qux"), OnNext(470, "foo"), OnCompleted(570) ); xs.Subscriptions.AssertEqual( Subscribe(200, 570) ); Assert.Equal(12, keyInvoked); Assert.Equal(12, eleInvoked); } [TestMethod] public void GroupByUntil_Capacity_DurationSelector_Throws() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, "foo") ); var ex = new Exception(); var res = scheduler.Start(() => xs.GroupByUntil(x => x, g => { throw ex; }, _groupByUntilCapacity) ); res.Messages.AssertEqual( OnError>(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void GroupByUntil_Capacity_NullKeys_Simple_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, "bar"), OnNext(240, "foo"), OnNext(310, "qux"), OnNext(470, "baz"), OnCompleted(500) ); var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never(), _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); res.Messages.AssertEqual( OnNext(220, "(null)bar"), OnNext(240, "FOOfoo"), OnNext(310, "QUXqux"), OnNext(470, "(null)baz"), OnCompleted(500) ); xs.Subscriptions.AssertEqual( Subscribe(200, 500) ); } [TestMethod] public void GroupByUntil_Capacity_NullKeys_Simple_Expire1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, "bar"), OnNext(240, "foo"), OnNext(310, "qux"), OnNext(470, "baz"), OnCompleted(500) ); var n = 0; var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) { n++; } return Observable.Timer(TimeSpan.FromTicks(50), scheduler); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); Assert.Equal(2, n); res.Messages.AssertEqual( OnNext(220, "(null)bar"), OnNext(240, "FOOfoo"), OnNext(310, "QUXqux"), OnNext(470, "(null)baz"), OnCompleted(500) ); xs.Subscriptions.AssertEqual( Subscribe(200, 500) ); } [TestMethod] public void GroupByUntil_Capacity_NullKeys_Simple_Expire2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(220, "bar"), OnNext(240, "foo"), OnNext(310, "qux"), OnNext(470, "baz"), OnCompleted(500) ); var n = 0; var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) { n++; } return Observable.Timer(TimeSpan.FromTicks(50), scheduler).IgnoreElements(); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); Assert.Equal(2, n); res.Messages.AssertEqual( OnNext(220, "(null)bar"), OnNext(240, "FOOfoo"), OnNext(310, "QUXqux"), OnNext(470, "(null)baz"), OnCompleted(500) ); xs.Subscriptions.AssertEqual( Subscribe(200, 500) ); } [TestMethod] public void GroupByUntil_Capacity_NullKeys_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(220, "bar"), OnNext(240, "foo"), OnNext(310, "qux"), OnNext(470, "baz"), OnError(500, ex) ); var nullGroup = scheduler.CreateObserver(); var err = default(Exception); scheduler.ScheduleAbsolute(200, () => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never(), _groupByUntilCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_)); scheduler.Start(); Assert.Same(ex, err); nullGroup.Messages.AssertEqual( OnNext(220, "bar"), OnNext(470, "baz"), OnError(500, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 500) ); } #endregion private static string Reverse(string s) { var sb = new StringBuilder(); for (var i = s.Length - 1; i >= 0; i--) { sb.Append(s[i]); } return sb.ToString(); } } }