// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using Microsoft.Reactive.Testing; using ReactiveTests.Dummies; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace ReactiveTests.Tests { [TestClass] public class ForkJoinTest : ReactiveTest { [TestMethod] public void ForkJoin_ArgumentChecking() { var someObservable = DummyObservable.Instance; ReactiveAssert.Throws(() => ObservableEx.ForkJoin(someObservable, someObservable, (Func)null)); ReactiveAssert.Throws(() => ObservableEx.ForkJoin(someObservable, (IObservable)null, (_, __) => _ + __)); ReactiveAssert.Throws(() => ObservableEx.ForkJoin((IObservable)null, someObservable, (_, __) => _ + __)); ReactiveAssert.Throws(() => ObservableEx.ForkJoin((IObservable[])null)); ReactiveAssert.Throws(() => ObservableEx.ForkJoin((IEnumerable>)null)); } [TestMethod] public void ForkJoin_EmptyEmpty() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnCompleted(250) }; var o = scheduler.CreateHotObservable(msgs1); var e = scheduler.CreateHotObservable(msgs2); var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y)); res.Messages.AssertEqual( OnCompleted(250) ); } [TestMethod] public void ForkJoin_None() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => ObservableEx.ForkJoin()); res.Messages.AssertEqual( OnCompleted(200) ); } [TestMethod] public void ForkJoin_EmptyReturn() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnNext(210, 2), OnCompleted(250) }; var o = scheduler.CreateHotObservable(msgs1); var e = scheduler.CreateHotObservable(msgs2); var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y)); res.Messages.AssertEqual( OnCompleted(250) ); } [TestMethod] public void ForkJoin_ReturnEmpty() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(210, 2), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnCompleted(250) }; var o = scheduler.CreateHotObservable(msgs1); var e = scheduler.CreateHotObservable(msgs2); var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y)); res.Messages.AssertEqual( OnCompleted(250) ); } [TestMethod] public void ForkJoin_ReturnReturn() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(210, 2), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnNext(220, 3), OnCompleted(250) }; var o = scheduler.CreateHotObservable(msgs1); var e = scheduler.CreateHotObservable(msgs2); var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y)); res.Messages.AssertEqual( OnNext(250, 2 + 3), OnCompleted(250) ); } [TestMethod] public void ForkJoin_EmptyThrow() { var ex = new Exception(); var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnError(210, ex), OnCompleted(250) }; var o = scheduler.CreateHotObservable(msgs1); var e = scheduler.CreateHotObservable(msgs2); var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y)); res.Messages.AssertEqual( OnError(210, ex) ); } [TestMethod] public void ForkJoin_ThrowEmpty() { var ex = new Exception(); var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnError(210, ex), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnCompleted(250) }; var o = scheduler.CreateHotObservable(msgs1); var e = scheduler.CreateHotObservable(msgs2); var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y)); res.Messages.AssertEqual( OnError(210, ex) ); } [TestMethod] public void ForkJoin_ReturnThrow() { var ex = new Exception(); var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(210, 2), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnError(220, ex), OnCompleted(250) }; var o = scheduler.CreateHotObservable(msgs1); var e = scheduler.CreateHotObservable(msgs2); var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y)); res.Messages.AssertEqual( OnError(220, ex) ); } [TestMethod] public void ForkJoin_ThrowReturn() { var ex = new Exception(); var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnError(220, ex), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnNext(210, 2), OnCompleted(250) }; var o = scheduler.CreateHotObservable(msgs1); var e = scheduler.CreateHotObservable(msgs2); var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y)); res.Messages.AssertEqual( OnError(220, ex) ); } [TestMethod] public void ForkJoin_Binary() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(215, 2), OnNext(225, 4), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnNext(235, 6), OnNext(240, 7), OnCompleted(250) }; var o = scheduler.CreateHotObservable(msgs1); var e = scheduler.CreateHotObservable(msgs2); var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y)); res.Messages.AssertEqual( OnNext(250, 4 + 7), // TODO: fix ForkJoin behavior OnCompleted(250) ); } [TestMethod] public void ForkJoin_NaryParams() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(215, 2), OnNext(225, 4), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnNext(235, 6), OnNext(240, 7), OnCompleted(250) }; var msgs3 = new[] { OnNext(150, 1), OnNext(230, 3), OnNext(245, 5), OnCompleted(270) }; var o1 = scheduler.CreateHotObservable(msgs1); var o2 = scheduler.CreateHotObservable(msgs2); var o3 = scheduler.CreateHotObservable(msgs3); var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3)); res.Messages.AssertEqual( OnNext(270, l => l.SequenceEqual(new[] { 4, 7, 5 })), // TODO: fix ForkJoin behavior OnCompleted(270) ); } [TestMethod] public void ForkJoin_NaryParamsEmpty() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(215, 2), OnNext(225, 4), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnNext(235, 6), OnNext(240, 7), OnCompleted(250) }; var msgs3 = new[] { OnCompleted(270) }; var o1 = scheduler.CreateHotObservable(msgs1); var o2 = scheduler.CreateHotObservable(msgs2); var o3 = scheduler.CreateHotObservable(msgs3); var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3)); res.Messages.AssertEqual( OnCompleted(270) ); } [TestMethod] public void ForkJoin_NaryParamsEmptyBeforeEnd() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(215, 2), OnNext(225, 4), OnCompleted(230) }; var msgs2 = new[] { OnCompleted(235) }; var msgs3 = new[] { OnNext(150, 1), OnNext(230, 3), OnNext(245, 5), OnCompleted(270) }; var o1 = scheduler.CreateHotObservable(msgs1); var o2 = scheduler.CreateHotObservable(msgs2); var o3 = scheduler.CreateHotObservable(msgs3); var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3)); res.Messages.AssertEqual( OnCompleted(235) ); } [TestMethod] public void ForkJoin_Nary_Immediate() { ObservableEx.ForkJoin(Observable.Return(1), Observable.Return(2)).First().SequenceEqual(new[] { 1, 2 }); } [TestMethod] public void ForkJoin_Nary_Virtual_And_Immediate() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(215, 2), OnNext(225, 4), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnNext(235, 6), OnNext(240, 7), OnCompleted(250) }; var msgs3 = new[] { OnNext(150, 1), OnNext(230, 3), OnNext(245, 5), OnCompleted(270) }; var o1 = scheduler.CreateHotObservable(msgs1); var o2 = scheduler.CreateHotObservable(msgs2); var o3 = scheduler.CreateHotObservable(msgs3); var res = scheduler.Start(() => ObservableEx.ForkJoin(new List> { o1, o2, o3, Observable.Return(20) })); res.Messages.AssertEqual( OnNext(270, l => l.SequenceEqual(new[] { 4, 7, 5, 20 })), OnCompleted(270) ); } [TestMethod] public void ForkJoin_Nary_Immediate_And_Virtual() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(215, 2), OnNext(225, 4), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnNext(235, 6), OnNext(240, 7), OnCompleted(250) }; var msgs3 = new[] { OnNext(150, 1), OnNext(230, 3), OnNext(245, 5), OnCompleted(270) }; var o1 = scheduler.CreateHotObservable(msgs1); var o2 = scheduler.CreateHotObservable(msgs2); var o3 = scheduler.CreateHotObservable(msgs3); var res = scheduler.Start(() => ObservableEx.ForkJoin(new List> { Observable.Return(20), o1, o2, o3 })); res.Messages.AssertEqual( OnNext(270, l => l.SequenceEqual(new[] { 20, 4, 7, 5 })), OnCompleted(270) ); } [TestMethod] public void ForkJoin_Nary() { var scheduler = new TestScheduler(); var msgs1 = new[] { OnNext(150, 1), OnNext(215, 2), OnNext(225, 4), OnCompleted(230) }; var msgs2 = new[] { OnNext(150, 1), OnNext(235, 6), OnNext(240, 7), OnCompleted(250) }; var msgs3 = new[] { OnNext(150, 1), OnNext(230, 3), OnNext(245, 5), OnCompleted(270) }; var o1 = scheduler.CreateHotObservable(msgs1); var o2 = scheduler.CreateHotObservable(msgs2); var o3 = scheduler.CreateHotObservable(msgs3); var res = scheduler.Start(() => ObservableEx.ForkJoin(new List> { o1, o2, o3 })); res.Messages.AssertEqual( OnNext(270, l => l.SequenceEqual(new[] { 4, 7, 5 })), // TODO: fix ForkJoin behavior OnCompleted(270) ); } [TestMethod] public void Bug_1302_SelectorThrows_LeftLast() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnCompleted(220) ); var ys = scheduler.CreateHotObservable( OnNext(215, 2), OnCompleted(217) ); var ex = new Exception(); var results = scheduler.Start(() => xs.ForkJoin(ys, (x, y) => { throw ex; })); results.Messages.AssertEqual( OnError(220, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 220) ); ys.Subscriptions.AssertEqual( Subscribe(200, 217) ); } [TestMethod] public void Bug_1302_SelectorThrows_RightLast() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnCompleted(217) ); var ys = scheduler.CreateHotObservable( OnNext(215, 2), OnCompleted(220) ); var ex = new Exception(); var results = scheduler.Start(() => xs.ForkJoin(ys, (x, y) => { throw ex; })); results.Messages.AssertEqual( OnError(220, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 217) ); ys.Subscriptions.AssertEqual( Subscribe(200, 220) ); } [TestMethod] public void Bug_1302_RightLast_NoLeft() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnCompleted(217) ); var ys = scheduler.CreateHotObservable( OnNext(215, 2), OnCompleted(220) ); var results = scheduler.Start(() => xs.ForkJoin(ys, (x, y) => x + y)); results.Messages.AssertEqual( OnCompleted(220) ); xs.Subscriptions.AssertEqual( Subscribe(200, 217) ); ys.Subscriptions.AssertEqual( Subscribe(200, 220) ); } [TestMethod] public void Bug_1302_RightLast_NoRight() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(215, 2), OnCompleted(217) ); var ys = scheduler.CreateHotObservable( OnCompleted(220) ); var results = scheduler.Start(() => xs.ForkJoin(ys, (x, y) => x + y)); results.Messages.AssertEqual( OnCompleted(220) ); xs.Subscriptions.AssertEqual( Subscribe(200, 217) ); ys.Subscriptions.AssertEqual( Subscribe(200, 220) ); } } }