// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Linq; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using Microsoft.Reactive.Testing; using ReactiveTests.Dummies; using Microsoft.VisualStudio.TestTools.UnitTesting; using Assert = Xunit.Assert; namespace ReactiveTests.Tests { [TestClass] public class CreateTest : ReactiveTest { [TestMethod] public void Create_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Create(default(Func, Action>))); // // BREAKING CHANGE v2.0 > v1.x - Returning null from Subscribe means "nothing to do upon unsubscription" // all null-coalesces to Disposable.Empty. // //ReactiveAssert.Throws(() => Observable.Create(o => default(Action)).Subscribe(DummyObserver.Instance)); ReactiveAssert.Throws(() => Observable.Create(o => () => { }).Subscribe(null)); ReactiveAssert.Throws(() => Observable.Create(o => { o.OnError(null); return () => { }; }).Subscribe(null)); } [TestMethod] public void Create_NullCoalescingAction() { var xs = Observable.Create(o => { o.OnNext(42); return default(Action); }); var lst = new List(); var d = xs.Subscribe(lst.Add); d.Dispose(); Assert.True(lst.SequenceEqual([42])); } [TestMethod] public void Create_Next() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Create(o => { o.OnNext(1); o.OnNext(2); return () => { }; }) ); res.Messages.AssertEqual( OnNext(200, 1), OnNext(200, 2) ); } [TestMethod] public void Create_Completed() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Create(o => { o.OnCompleted(); o.OnNext(100); o.OnError(new Exception()); o.OnCompleted(); return () => { }; }) ); res.Messages.AssertEqual( OnCompleted(200) ); } [TestMethod] public void Create_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Create(o => { o.OnError(ex); o.OnNext(100); o.OnError(new Exception()); o.OnCompleted(); return () => { }; }) ); res.Messages.AssertEqual( OnError(200, ex) ); } [TestMethod] public void Create_Exception() { ReactiveAssert.Throws(() => Observable.Create(new Func, Action>(o => { throw new InvalidOperationException(); })).Subscribe()); } [TestMethod] public void Create_Dispose() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Create(o => { var stopped = false; o.OnNext(1); o.OnNext(2); scheduler.Schedule(TimeSpan.FromTicks(600), () => { if (!stopped) { o.OnNext(3); } }); scheduler.Schedule(TimeSpan.FromTicks(700), () => { if (!stopped) { o.OnNext(4); } }); scheduler.Schedule(TimeSpan.FromTicks(900), () => { if (!stopped) { o.OnNext(5); } }); scheduler.Schedule(TimeSpan.FromTicks(1100), () => { if (!stopped) { o.OnNext(6); } }); return () => { stopped = true; }; }) ); res.Messages.AssertEqual( OnNext(200, 1), OnNext(200, 2), OnNext(800, 3), OnNext(900, 4) ); } [TestMethod] public void Create_ObserverThrows() { ReactiveAssert.Throws(() => Observable.Create(o => { o.OnNext(1); return () => { }; }).Subscribe(x => { throw new InvalidOperationException(); })); ReactiveAssert.Throws(() => Observable.Create(o => { o.OnError(new Exception()); return () => { }; }).Subscribe(x => { }, ex => { throw new InvalidOperationException(); })); ReactiveAssert.Throws(() => Observable.Create(o => { o.OnCompleted(); return () => { }; }).Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); })); } [TestMethod] public void CreateWithDisposable_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Create(default(Func, IDisposable>))); ReactiveAssert.Throws(() => Observable.Create(o => DummyDisposable.Instance).Subscribe(null)); ReactiveAssert.Throws(() => Observable.Create(o => { o.OnError(null); return DummyDisposable.Instance; }).Subscribe(null)); } [TestMethod] public void CreateWithDisposable_NullCoalescingAction() { var xs = Observable.Create(o => { o.OnNext(42); return default(IDisposable); }); var lst = new List(); var d = xs.Subscribe(lst.Add); d.Dispose(); Assert.True(lst.SequenceEqual([42])); } [TestMethod] public void CreateWithDisposable_Next() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Create(o => { o.OnNext(1); o.OnNext(2); return Disposable.Empty; }) ); res.Messages.AssertEqual( OnNext(200, 1), OnNext(200, 2) ); } [TestMethod] public void CreateWithDisposable_Completed() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Create(o => { o.OnCompleted(); o.OnNext(100); o.OnError(new Exception()); o.OnCompleted(); return Disposable.Empty; }) ); res.Messages.AssertEqual( OnCompleted(200) ); } [TestMethod] public void CreateWithDisposable_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Create(o => { o.OnError(ex); o.OnNext(100); o.OnError(new Exception()); o.OnCompleted(); return Disposable.Empty; }) ); res.Messages.AssertEqual( OnError(200, ex) ); } [TestMethod] public void CreateWithDisposable_Exception() { ReactiveAssert.Throws(() => Observable.Create(new Func, IDisposable>(o => { throw new InvalidOperationException(); })).Subscribe()); } [TestMethod] public void CreateWithDisposable_Dispose() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Create(o => { var d = new BooleanDisposable(); o.OnNext(1); o.OnNext(2); scheduler.Schedule(TimeSpan.FromTicks(600), () => { if (!d.IsDisposed) { o.OnNext(3); } }); scheduler.Schedule(TimeSpan.FromTicks(700), () => { if (!d.IsDisposed) { o.OnNext(4); } }); scheduler.Schedule(TimeSpan.FromTicks(900), () => { if (!d.IsDisposed) { o.OnNext(5); } }); scheduler.Schedule(TimeSpan.FromTicks(1100), () => { if (!d.IsDisposed) { o.OnNext(6); } }); return d; }) ); res.Messages.AssertEqual( OnNext(200, 1), OnNext(200, 2), OnNext(800, 3), OnNext(900, 4) ); } [TestMethod] public void CreateWithDisposable_ObserverThrows() { ReactiveAssert.Throws(() => Observable.Create(o => { o.OnNext(1); return Disposable.Empty; }).Subscribe(x => { throw new InvalidOperationException(); })); ReactiveAssert.Throws(() => Observable.Create(o => { o.OnError(new Exception()); return Disposable.Empty; }).Subscribe(x => { }, ex => { throw new InvalidOperationException(); })); ReactiveAssert.Throws(() => Observable.Create(o => { o.OnCompleted(); return Disposable.Empty; }).Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); })); } [TestMethod] public void Iterate_ArgumentChecking() { ReactiveAssert.Throws(() => ObservableEx.Create(default)); ReactiveAssert.Throws(() => ObservableEx.Create(DummyFunc, IEnumerable>>.Instance).Subscribe(null)); } private IEnumerable> ToIterate_Complete(IObservable xs, IObservable ys, IObservable zs, IObserver observer) { observer.OnNext(1); yield return xs.Select(x => new object()); observer.OnNext(2); yield return ys.Select(x => new object()); observer.OnNext(3); observer.OnCompleted(); yield return zs.Select(x => new object()); observer.OnNext(4); } [TestMethod] public void Iterate_Complete() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnCompleted(50) ); var ys = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnCompleted(30) ); var zs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5), OnCompleted(60) ); var res = scheduler.Start(() => ObservableEx.Create(observer => ToIterate_Complete(xs, ys, zs, observer))); res.Messages.AssertEqual( OnNext(200, 1), OnNext(250, 2), OnNext(280, 3), OnCompleted(280) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(250, 280) ); zs.Subscriptions.AssertEqual( Subscribe(280, 280) ); } private IEnumerable> ToIterate_Complete_Implicit(IObservable xs, IObservable ys, IObservable zs, IObserver observer) { observer.OnNext(1); yield return xs.Select(x => new object()); observer.OnNext(2); yield return ys.Select(x => new object()); observer.OnNext(3); yield return zs.Select(x => new object()); observer.OnNext(4); } [TestMethod] public void Iterate_Complete_Implicit() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnCompleted(50) ); var ys = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnCompleted(30) ); var zs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5), OnCompleted(60) ); var res = scheduler.Start(() => ObservableEx.Create(observer => ToIterate_Complete_Implicit(xs, ys, zs, observer))); res.Messages.AssertEqual( OnNext(200, 1), OnNext(250, 2), OnNext(280, 3), OnNext(340, 4), OnCompleted(340) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(250, 280) ); zs.Subscriptions.AssertEqual( Subscribe(280, 340) ); } private IEnumerable> ToIterate_Throw(IObservable xs, IObservable ys, IObservable zs, IObserver observer, Exception ex) { observer.OnNext(1); yield return xs.Select(x => new object()); observer.OnNext(2); yield return ys.Select(x => new object()); observer.OnNext(3); if (xs != null) { throw ex; } yield return zs.Select(x => new object()); observer.OnNext(4); observer.OnCompleted(); } [TestMethod] public void Iterate_Iterator_Throw() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnCompleted(50) ); var ys = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnCompleted(30) ); var zs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5), OnCompleted(60) ); var ex = new Exception(); var res = scheduler.Start(() => ObservableEx.Create(observer => ToIterate_Throw(xs, ys, zs, observer, ex))); res.Messages.AssertEqual( OnNext(200, 1), OnNext(250, 2), OnNext(280, 3), OnError(280, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(250, 280) ); zs.Subscriptions.AssertEqual( ); } private IEnumerable> ToIterate_Error(IObservable xs, IObservable ys, IObservable zs, IObserver observer, Exception ex) { observer.OnNext(1); yield return xs.Select(x => new object()); observer.OnNext(2); observer.OnError(ex); yield return ys.Select(x => new object()); observer.OnNext(3); yield return zs.Select(x => new object()); observer.OnNext(4); observer.OnCompleted(); } [TestMethod] public void Iterate_Iterator_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnCompleted(50) ); var ys = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnCompleted(30) ); var zs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5), OnCompleted(60) ); var res = scheduler.Start(() => ObservableEx.Create(observer => ToIterate_Error(xs, ys, zs, observer, ex))); res.Messages.AssertEqual( OnNext(200, 1), OnNext(250, 2), OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(250, 250) ); zs.Subscriptions.AssertEqual( ); } private IEnumerable> ToIterate_Complete_Dispose(IObservable xs, IObservable ys, IObservable zs, IObserver observer) { observer.OnNext(1); yield return xs.Select(x => new object()); observer.OnNext(2); yield return ys.Select(x => new object()); observer.OnNext(3); yield return zs.Select(x => new object()); observer.OnNext(4); } [TestMethod] public void Iterate_Complete_Dispose() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnCompleted(50) ); var ys = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnCompleted(30) ); var zs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5), OnNext(600, 6), OnNext(700, 7), OnNext(800, 8), OnNext(900, 9), OnNext(1000, 10) ); var res = scheduler.Start(() => ObservableEx.Create(observer => ToIterate_Complete_Dispose(xs, ys, zs, observer))); res.Messages.AssertEqual( OnNext(200, 1), OnNext(250, 2), OnNext(280, 3) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(250, 280) ); zs.Subscriptions.AssertEqual( Subscribe(280, 1000) ); } [TestMethod] public void IteratorScenario() { var xs = ObservableEx.Create(o => _IteratorScenario(100, 1000, o)); xs.AssertEqual(new[] { 100, 1000 }.ToObservable()); } private static IEnumerable> _IteratorScenario(int x, int y, IObserver results) { var xs = Observable.Range(1, x).ToListObservable(); yield return xs; results.OnNext(xs.Value); var ys = Observable.Range(1, y).ToListObservable(); yield return ys; results.OnNext(ys.Value); } [TestMethod] public void Iterate_Void_ArgumentChecking() { ReactiveAssert.Throws(() => ObservableEx.Create(default)); ReactiveAssert.Throws(() => ObservableEx.Create(DummyFunc>>.Instance).Subscribe(null)); } private IEnumerable> ToIterate_Void_Complete(IObservable xs, IObservable ys, IObservable zs) { yield return xs.Select(x => new object()); yield return ys.Select(x => new object()); yield return zs.Select(x => new object()); } [TestMethod] public void Iterate_Void_Complete() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnCompleted(50) ); var ys = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnCompleted(30) ); var zs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5), OnCompleted(60) ); var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete(xs, ys, zs))); res.Messages.AssertEqual( OnCompleted(340) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(250, 280) ); zs.Subscriptions.AssertEqual( Subscribe(280, 340) ); } private IEnumerable> ToIterate_Void_Complete_Implicit(IObservable xs, IObservable ys, IObservable zs) { yield return xs.Select(x => new object()); yield return ys.Select(x => new object()); yield return zs.Select(x => new object()); } [TestMethod] public void Iterate_Void_Complete_Implicit() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnCompleted(50) ); var ys = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnCompleted(30) ); var zs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5), OnCompleted(60) ); var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Implicit(xs, ys, zs))); res.Messages.AssertEqual( OnCompleted(340) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(250, 280) ); zs.Subscriptions.AssertEqual( Subscribe(280, 340) ); } private IEnumerable> ToIterate_Void_Throw(IObservable xs, IObservable ys, IObservable zs, Exception ex) { yield return xs.Select(x => new object()); yield return ys.Select(x => new object()); if (xs != null) { throw ex; } yield return zs.Select(x => new object()); } [TestMethod] public void Iterate_Void_Iterator_Throw() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnCompleted(50) ); var ys = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnCompleted(30) ); var zs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5), OnCompleted(60) ); var ex = new Exception(); var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Throw(xs, ys, zs, ex))); res.Messages.AssertEqual( OnError(280, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(250, 280) ); zs.Subscriptions.AssertEqual( ); } private IEnumerable> ToIterate_Void_Complete_Dispose(IObservable xs, IObservable ys, IObservable zs) { yield return xs.Select(x => new object()); yield return ys.Select(x => new object()); yield return zs.Select(x => new object()); } [TestMethod] public void Iterate_Void_Complete_Dispose() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnCompleted(50) ); var ys = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnCompleted(30) ); var zs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5), OnNext(600, 6), OnNext(700, 7), OnNext(800, 8), OnNext(900, 9), OnNext(1000, 10) ); var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Dispose(xs, ys, zs))); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(250, 280) ); zs.Subscriptions.AssertEqual( Subscribe(280, 1000) ); } [TestMethod] public void Iterate_Void_Func_Throw() { var scheduler = new TestScheduler(); var obs = scheduler.Start(() => ObservableEx.Create(() => { throw new InvalidOperationException(); })); Assert.Equal(1, obs.Messages.Count); var notification = obs.Messages[0].Value; Assert.Equal(NotificationKind.OnError, notification.Kind); Assert.IsType(notification.Exception); } private static IEnumerable> _IteratorScenario_Void(int x, int y) { var xs = Observable.Range(1, x).ToListObservable(); yield return xs; var ys = Observable.Range(1, y).ToListObservable(); yield return ys; } [TestMethod] public void IteratorScenario_Void() { var xs = ObservableEx.Create(() => _IteratorScenario_Void(100, 1000)); xs.AssertEqual(new Unit[] { }.ToObservable()); } } }