// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System; using System.Reactive.Concurrency; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Microsoft.Reactive.Testing; using Xunit; namespace ReactiveTests.Tests { public partial class AsyncSubjectTest : ReactiveTest { [Fact] public void Subscribe_ArgumentChecking() { ReactiveAssert.Throws(() => new AsyncSubject().Subscribe(null)); } [Fact] public void OnError_ArgumentChecking() { ReactiveAssert.Throws(() => new AsyncSubject().OnError(null)); } [Fact] public void Infinite() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnNext(630, 8), OnNext(710, 9), OnNext(870, 10), OnNext(940, 11), OnNext(1020, 12) ); var subject = default(AsyncSubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new AsyncSubject()); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( ); results2.Messages.AssertEqual( ); results3.Messages.AssertEqual( ); } [Fact] public void Finite() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(AsyncSubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new AsyncSubject()); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( ); results2.Messages.AssertEqual( OnNext(630, 7), OnCompleted(630) ); results3.Messages.AssertEqual( OnNext(900, 7), OnCompleted(900) ); } [Fact] public void Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnError(630, ex), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(AsyncSubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new AsyncSubject()); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( ); results2.Messages.AssertEqual( OnError(630, ex) ); results3.Messages.AssertEqual( OnError(900, ex) ); } [Fact] public void Canceled() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(AsyncSubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new AsyncSubject()); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( ); results2.Messages.AssertEqual( OnCompleted(630) ); results3.Messages.AssertEqual( OnCompleted(900) ); } [Fact] public void SubjectDisposed() { var scheduler = new TestScheduler(); var subject = default(AsyncSubject); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new AsyncSubject()); scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(500, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(600, () => subject.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription3.Dispose()); scheduler.ScheduleAbsolute(150, () => subject.OnNext(1)); scheduler.ScheduleAbsolute(250, () => subject.OnNext(2)); scheduler.ScheduleAbsolute(350, () => subject.OnNext(3)); scheduler.ScheduleAbsolute(450, () => subject.OnNext(4)); scheduler.ScheduleAbsolute(550, () => subject.OnNext(5)); scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6))); scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted())); scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception()))); scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe())); scheduler.Start(); results1.Messages.AssertEqual( ); results2.Messages.AssertEqual( ); results3.Messages.AssertEqual( ); } #if HAS_AWAIT #if !NO_THREAD [Fact] public void Await_Blocking() { var s = new AsyncSubject(); GetResult_Blocking(s.GetAwaiter()); } [Fact] public void Await_Throw() { var s = new AsyncSubject(); GetResult_Blocking_Throw(s.GetAwaiter()); } #endif #endif [Fact] public void GetResult_Empty() { var s = new AsyncSubject(); s.OnCompleted(); ReactiveAssert.Throws(() => s.GetResult()); } #if !NO_THREAD [Fact] public void GetResult_Blocking() { GetResult_Blocking(new AsyncSubject()); } private void GetResult_Blocking(AsyncSubject s) { Assert.False(s.IsCompleted); var e = new ManualResetEvent(false); new Thread(() => { e.WaitOne(); s.OnNext(42); s.OnCompleted(); }).Start(); var y = default(int); var t = new Thread(() => { y = s.GetResult(); }); t.Start(); while (t.ThreadState != ThreadState.WaitSleepJoin) ; e.Set(); t.Join(); Assert.Equal(42, y); Assert.True(s.IsCompleted); } [Fact] public void GetResult_Blocking_Throw() { GetResult_Blocking_Throw(new AsyncSubject()); } private void GetResult_Blocking_Throw(AsyncSubject s) { Assert.False(s.IsCompleted); var e = new ManualResetEvent(false); var ex = new Exception(); new Thread(() => { e.WaitOne(); s.OnError(ex); }).Start(); var y = default(Exception); var t = new Thread(() => { try { s.GetResult(); } catch (Exception ex_) { y = ex_; } }); t.Start(); while (t.ThreadState != ThreadState.WaitSleepJoin) ; e.Set(); t.Join(); Assert.Same(ex, y); Assert.True(s.IsCompleted); } #endif #if HAS_AWAIT [Fact] public void GetResult_Context() { var x = new AsyncSubject(); var ctx = new MyContext(); var e = new ManualResetEvent(false); Task.Run(() => { SynchronizationContext.SetSynchronizationContext(ctx); var a = x.GetAwaiter(); a.OnCompleted(() => { e.Set(); }); }); x.OnNext(42); x.OnCompleted(); e.WaitOne(); Assert.True(ctx.ran); } class MyContext : SynchronizationContext { public bool ran; public override void Post(SendOrPostCallback d, object state) { ran = true; d(state); } } #endif [Fact] public void HasObservers() { var s = new AsyncSubject(); Assert.False(s.HasObservers); var d1 = s.Subscribe(_ => { }); Assert.True(s.HasObservers); d1.Dispose(); Assert.False(s.HasObservers); var d2 = s.Subscribe(_ => { }); Assert.True(s.HasObservers); var d3 = s.Subscribe(_ => { }); Assert.True(s.HasObservers); d2.Dispose(); Assert.True(s.HasObservers); d3.Dispose(); Assert.False(s.HasObservers); } [Fact] public void HasObservers_Dispose1() { var s = new AsyncSubject(); Assert.False(s.HasObservers); Assert.False(s.IsDisposed); var d = s.Subscribe(_ => { }); Assert.True(s.HasObservers); Assert.False(s.IsDisposed); s.Dispose(); Assert.False(s.HasObservers); Assert.True(s.IsDisposed); d.Dispose(); Assert.False(s.HasObservers); Assert.True(s.IsDisposed); } [Fact] public void HasObservers_Dispose2() { var s = new AsyncSubject(); Assert.False(s.HasObservers); Assert.False(s.IsDisposed); var d = s.Subscribe(_ => { }); Assert.True(s.HasObservers); Assert.False(s.IsDisposed); d.Dispose(); Assert.False(s.HasObservers); Assert.False(s.IsDisposed); s.Dispose(); Assert.False(s.HasObservers); Assert.True(s.IsDisposed); } [Fact] public void HasObservers_Dispose3() { var s = new AsyncSubject(); Assert.False(s.HasObservers); Assert.False(s.IsDisposed); s.Dispose(); Assert.False(s.HasObservers); Assert.True(s.IsDisposed); } [Fact] public void HasObservers_OnCompleted() { var s = new AsyncSubject(); Assert.False(s.HasObservers); var d = s.Subscribe(_ => { }); Assert.True(s.HasObservers); s.OnNext(42); Assert.True(s.HasObservers); s.OnCompleted(); Assert.False(s.HasObservers); } [Fact] public void HasObservers_OnError() { var s = new AsyncSubject(); Assert.False(s.HasObservers); var d = s.Subscribe(_ => { }, ex => { }); Assert.True(s.HasObservers); s.OnNext(42); Assert.True(s.HasObservers); s.OnError(new Exception()); Assert.False(s.HasObservers); } } }