// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; using Microsoft.Reactive.Testing; using ReactiveTests.Dummies; using Xunit; namespace ReactiveTests.Tests { public class SubscribeTest : ReactiveTest { [Fact] public void SubscribeToEnumerable_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Subscribe(null, DummyObserver.Instance)); ReactiveAssert.Throws(() => Observable.Subscribe(DummyEnumerable.Instance, null)); ReactiveAssert.Throws(() => Observable.Subscribe(null, DummyObserver.Instance, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Subscribe(DummyEnumerable.Instance, DummyObserver.Instance, null)); ReactiveAssert.Throws(() => Observable.Subscribe(DummyEnumerable.Instance, null, DummyScheduler.Instance)); ReactiveAssert.Throws(() => NullEnumeratorEnumerable.Instance.Subscribe(Observer.Create(x => { }), Scheduler.CurrentThread)); } [Fact] public void SubscribeToEnumerable_Finite() { var scheduler = new TestScheduler(); var results = scheduler.CreateObserver(); var d = default(IDisposable); var xs = default(MockEnumerable); scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable(scheduler, Enumerable_Finite())); scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler)); scheduler.ScheduleAbsolute(Disposed, () => d.Dispose()); scheduler.Start(); results.Messages.AssertEqual( OnNext(201, 1), OnNext(202, 2), OnNext(203, 3), OnNext(204, 4), OnNext(205, 5), OnCompleted(206) ); xs.Subscriptions.AssertEqual( Subscribe(200, 206) ); } [Fact] public void SubscribeToEnumerable_Infinite() { var scheduler = new TestScheduler(); var results = scheduler.CreateObserver(); var d = default(IDisposable); var xs = default(MockEnumerable); scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable(scheduler, Enumerable_Infinite())); scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler)); scheduler.ScheduleAbsolute(210, () => d.Dispose()); scheduler.Start(); results.Messages.AssertEqual( OnNext(201, 1), OnNext(202, 1), OnNext(203, 1), OnNext(204, 1), OnNext(205, 1), OnNext(206, 1), OnNext(207, 1), OnNext(208, 1), OnNext(209, 1) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [Fact] public void SubscribeToEnumerable_Error() { var scheduler = new TestScheduler(); var results = scheduler.CreateObserver(); var d = default(IDisposable); var xs = default(MockEnumerable); var ex = new Exception(); scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable(scheduler, Enumerable_Error(ex))); scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler)); scheduler.ScheduleAbsolute(Disposed, () => d.Dispose()); scheduler.Start(); results.Messages.AssertEqual( OnNext(201, 1), OnNext(202, 2), OnNext(203, 3), OnError(204, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 204) ); } [Fact] public void SubscribeToEnumerable_DefaultScheduler() { for (var i = 0; i < 100; i++) { var scheduler = new TestScheduler(); var results1 = new List(); var results2 = new List(); var s1 = new Semaphore(0, 1); var s2 = new Semaphore(0, 1); Observable.Subscribe(Enumerable_Finite(), Observer.Create(x => results1.Add(x), ex => { throw ex; }, () => s1.Release())); Observable.Subscribe(Enumerable_Finite(), Observer.Create(x => results2.Add(x), ex => { throw ex; }, () => s2.Release()), DefaultScheduler.Instance); s1.WaitOne(); s2.WaitOne(); results1.AssertEqual(results2); } } private IEnumerable Enumerable_Finite() { yield return 1; yield return 2; yield return 3; yield return 4; yield return 5; yield break; } private IEnumerable Enumerable_Infinite() { while (true) { yield return 1; } } private IEnumerable Enumerable_Error(Exception exception) { yield return 1; yield return 2; yield return 3; throw exception; } #region Subscribe [Fact] public void Subscribe_ArgumentChecking() { var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, _ => { })); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, default(Action))); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, _ => { }, () => { })); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, default, () => { })); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, _ => { }, default(Action))); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, _ => { }, (Exception _) => { })); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, default, (Exception _) => { })); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, _ => { }, default(Action))); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, _ => { }, (Exception _) => { }, () => { })); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, default, (Exception _) => { }, () => { })); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, _ => { }, default, () => { })); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, _ => { }, (Exception _) => { }, default(Action))); } [Fact] public void Subscribe_None_Return() { Observable.Return(1, Scheduler.Immediate).Subscribe(); } [Fact] public void Subscribe_None_Throw() { var ex = new Exception(); var xs = Observable.Throw(ex, Scheduler.Immediate); ReactiveAssert.Throws(ex, () => xs.Subscribe()); } [Fact] public void Subscribe_None_Empty() { Observable.Empty(Scheduler.Immediate).Subscribe((int _) => { Assert.True(false); }); } [Fact] public void Subscribe_OnNext_Return() { var _x = -1; Observable.Return(42, Scheduler.Immediate).Subscribe((int x) => { _x = x; }); Assert.Equal(42, _x); } [Fact] public void Subscribe_OnNext_Throw() { var ex = new Exception(); var xs = Observable.Throw(ex, Scheduler.Immediate); ReactiveAssert.Throws(ex, () => xs.Subscribe(_ => { Assert.True(false); })); } [Fact] public void Subscribe_OnNext_Empty() { Observable.Empty(Scheduler.Immediate).Subscribe((int _) => { Assert.True(false); }); } [Fact] public void Subscribe_OnNextOnCompleted_Return() { var finished = false; var _x = -1; Observable.Return(42, Scheduler.Immediate).Subscribe((int x) => { _x = x; }, () => { finished = true; }); Assert.Equal(42, _x); Assert.True(finished); } [Fact] public void Subscribe_OnNextOnCompleted_Throw() { var ex = new Exception(); var xs = Observable.Throw(ex, Scheduler.Immediate); ReactiveAssert.Throws(ex, () => xs.Subscribe(_ => { Assert.True(false); }, () => { Assert.True(false); })); } [Fact] public void Subscribe_OnNextOnCompleted_Empty() { var finished = false; Observable.Empty(Scheduler.Immediate).Subscribe((int _) => { Assert.True(false); }, () => { finished = true; }); Assert.True(finished); } #endregion #region Subscribe with CancellationToken [Fact] public void Subscribe_CT_ArgumentChecking() { var someObservable = Observable.Empty(); var someObserver = Observer.Create(_ => { }); var ct = CancellationToken.None; ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, someObserver, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, default(IObserver), ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, _ => { }, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, default(Action), ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, _ => { }, () => { }, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, default, () => { }, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, _ => { }, default(Action), ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, _ => { }, (Exception _) => { }, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, default, (Exception _) => { }, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, _ => { }, default(Action), ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(default, _ => { }, (Exception _) => { }, () => { }, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, default, (Exception _) => { }, () => { }, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, _ => { }, default, () => { }, ct)); ReactiveAssert.Throws(() => ObservableExtensions.Subscribe(someObservable, _ => { }, (Exception _) => { }, default, ct)); } [Fact] public void Subscribe_CT_None() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var obs = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, CancellationToken.None)); scheduler.Start(); obs.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); xs.Subscriptions.AssertEqual( Subscribe(200, Subscription.Infinite /* no auto-dispose when using CreateHotObservable */) ); } [Fact] public void Subscribe_CT_CancelBeforeBegin() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var cts = new CancellationTokenSource(); var obs = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, cts.Token)); scheduler.ScheduleAbsolute(150, cts.Cancel); scheduler.Start(); obs.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( ); } [Fact] public void Subscribe_CT_CancelMiddle() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var cts = new CancellationTokenSource(); var obs = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, cts.Token)); scheduler.ScheduleAbsolute(225, cts.Cancel); scheduler.Start(); obs.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2) ); xs.Subscriptions.AssertEqual( Subscribe(200, 225) ); } [Fact] public void Subscribe_CT_CancelAfterEnd() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnError(240, ex) ); var cts = new CancellationTokenSource(); var obs = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, cts.Token)); scheduler.ScheduleAbsolute(250, cts.Cancel); scheduler.Start(); obs.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnError(240, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, Subscription.Infinite /* no auto-dispose when using CreateHotObservable */) ); } [Fact] public void Subscribe_CT_NeverCancel() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var cts = new CancellationTokenSource(); var obs = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, cts.Token)); scheduler.Start(); obs.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); xs.Subscriptions.AssertEqual( Subscribe(200, Subscription.Infinite /* no auto-dispose when using CreateHotObservable */) ); } [Fact] public void Subscribe_CT_Overloads_AlreadyCancelled() { var xs = Observable.Defer(() => { Assert.True(false); return Observable.Return(42, Scheduler.Immediate); }); var cts = new CancellationTokenSource(); cts.Cancel(); xs.Subscribe(cts.Token); xs.Subscribe(_ => { }, cts.Token); xs.Subscribe(_ => { }, ex => { }, cts.Token); xs.Subscribe(_ => { }, () => { }, cts.Token); xs.Subscribe(_ => { }, ex => { }, () => { }, cts.Token); xs.Subscribe(Observer.Create(_ => { }, ex => { }, () => { }), cts.Token); } [Fact] public void Subscribe_CT_Overloads_None() { var i = 0; var n = 0; var e = 0; var c = 0; var xs = Observable.Defer(() => { i++; return Observable.Return(42, Scheduler.Immediate); }); xs.Subscribe(CancellationToken.None); xs.Subscribe(_ => { n++; }, CancellationToken.None); xs.Subscribe(_ => { n++; }, ex => { e++; }, CancellationToken.None); xs.Subscribe(_ => { n++; }, () => { c++; }, CancellationToken.None); xs.Subscribe(_ => { n++; }, ex => { e++; }, () => { c++; }, CancellationToken.None); xs.Subscribe(Observer.Create(_ => { n++; }, ex => { e++; }, () => { c++; }), CancellationToken.None); Assert.Equal(6, i); Assert.Equal(5, n); Assert.Equal(0, e); Assert.Equal(3, c); } [Fact] public void Subscribe_CT_CancelDuringCallback() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var cts = new CancellationTokenSource(); var n = 0; scheduler.ScheduleAbsolute(200, () => xs.Subscribe(x => { n++; if (x == 2) { cts.Cancel(); } }, cts.Token)); scheduler.Start(); Assert.Equal(2, n); xs.Subscriptions.AssertEqual( Subscribe(200, 220) ); } #endregion } }