| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the MIT License.// See the LICENSE file in the project root for more information. using System;using System.Collections.Generic;using System.Linq;using System.Reactive;using System.Reactive.Concurrency;using System.Threading;using System.Threading.Tasks;using Microsoft.Reactive.Testing;using Microsoft.VisualStudio.TestTools.UnitTesting;using Assert = Xunit.Assert;namespace ReactiveTests.Tests{    [TestClass]    public partial class ObserverTest : ReactiveTest    {        [TestMethod]        public void ToObserver_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.ToObserver(default(Action<Notification<int>>)));        }        [TestMethod]        public void ToObserver_NotificationOnNext()        {            var i = 0;            Action<Notification<int>> next = n =>            {                Assert.Equal(i++, 0);                Assert.Equal(n.Kind, NotificationKind.OnNext);                Assert.Equal(n.Value, 42);                Assert.Equal(n.Exception, null);                Assert.True(n.HasValue);            };            next.ToObserver().OnNext(42);        }        [TestMethod]        public void ToObserver_NotificationOnError()        {            var ex = new Exception();            var i = 0;            Action<Notification<int>> next = n =>            {                Assert.Equal(i++, 0);                Assert.Equal(n.Kind, NotificationKind.OnError);                Assert.Same(n.Exception, ex);                Assert.False(n.HasValue);            };            next.ToObserver().OnError(ex);        }        [TestMethod]        public void ToObserver_NotificationOnCompleted()        {            var ex = new Exception();            var i = 0;            Action<Notification<int>> next = n =>            {                Assert.Equal(i++, 0);                Assert.Equal(n.Kind, NotificationKind.OnCompleted);                Assert.False(n.HasValue);            };            next.ToObserver().OnCompleted();        }        [TestMethod]        public void ToNotifier_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.ToNotifier<int>(default));        }        [TestMethod]        public void ToNotifier_Forwards()        {            var obsn = new MyObserver();            obsn.ToNotifier()(Notification.CreateOnNext(42));            Assert.Equal(obsn.HasOnNext, 42);            var ex = new Exception();            var obse = new MyObserver();            obse.ToNotifier()(Notification.CreateOnError<int>(ex));            Assert.Same(ex, obse.HasOnError);            var obsc = new MyObserver();            obsc.ToNotifier()(Notification.CreateOnCompleted<int>());            Assert.True(obsc.HasOnCompleted);        }        [TestMethod]        public void Create_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Create<int>(default));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Create<int>(default, () => { }));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Create<int>(_ => { }, default(Action)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Create<int>(default, (Exception _) => { }));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Create<int>(_ => { }, default(Action<Exception>)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Create<int>(default, (Exception _) => { }, () => { }));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Create<int>(_ => { }, default, () => { }));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Create<int>(_ => { }, (Exception _) => { }, default));        }        [TestMethod]        public void Create_OnNext()        {            var next = false;            var res = Observer.Create<int>(x => { Assert.Equal(42, x); next = true; });            res.OnNext(42);            Assert.True(next);            res.OnCompleted();        }        [TestMethod]        public void Create_OnNext_HasError()        {            var ex = new Exception();            var e_ = default(Exception);            var next = false;            var res = Observer.Create<int>(x => { Assert.Equal(42, x); next = true; });            res.OnNext(42);            Assert.True(next);            try            {                res.OnError(ex);                Assert.True(false);            }            catch (Exception e)            {                e_ = e;            }            Assert.Same(ex, e_);        }        [TestMethod]        public void Create_OnNextOnCompleted()        {            var next = false;            var completed = false;            var res = Observer.Create<int>(x => { Assert.Equal(42, x); next = true; }, () => { completed = true; });            res.OnNext(42);            Assert.True(next);            Assert.False(completed);            res.OnCompleted();            Assert.True(completed);        }        [TestMethod]        public void Create_OnNextOnCompleted_HasError()        {            var ex = new Exception();            var e_ = default(Exception);            var next = false;            var completed = false;            var res = Observer.Create<int>(x => { Assert.Equal(42, x); next = true; }, () => { completed = true; });            res.OnNext(42);            Assert.True(next);            Assert.False(completed);            try            {                res.OnError(ex);                Assert.True(false);            }            catch (Exception e)            {                e_ = e;            }            Assert.Same(ex, e_);            Assert.False(completed);        }        [TestMethod]        public void Create_OnNextOnError()        {            var ex = new Exception();            var next = true;            var error = false;            var res = Observer.Create<int>(x => { Assert.Equal(42, x); next = true; }, e => { Assert.Same(ex, e); error = true; });            res.OnNext(42);            Assert.True(next);            Assert.False(error);            res.OnError(ex);            Assert.True(error);        }        [TestMethod]        public void Create_OnNextOnError_HitCompleted()        {            var ex = new Exception();            var next = true;            var error = false;            var res = Observer.Create<int>(x => { Assert.Equal(42, x); next = true; }, e => { Assert.Same(ex, e); error = true; });            res.OnNext(42);            Assert.True(next);            Assert.False(error);            res.OnCompleted();            Assert.False(error);        }        [TestMethod]        public void Create_OnNextOnErrorOnCompleted1()        {            var ex = new Exception();            var next = true;            var error = false;            var completed = false;            var res = Observer.Create<int>(x => { Assert.Equal(42, x); next = true; }, e => { Assert.Same(ex, e); error = true; }, () => { completed = true; });            res.OnNext(42);            Assert.True(next);            Assert.False(error);            Assert.False(completed);            res.OnCompleted();            Assert.True(completed);            Assert.False(error);        }        [TestMethod]        public void Create_OnNextOnErrorOnCompleted2()        {            var ex = new Exception();            var next = true;            var error = false;            var completed = false;            var res = Observer.Create<int>(x => { Assert.Equal(42, x); next = true; }, e => { Assert.Same(ex, e); error = true; }, () => { completed = true; });            res.OnNext(42);            Assert.True(next);            Assert.False(error);            Assert.False(completed);            res.OnError(ex);            Assert.True(error);            Assert.False(completed);        }        [TestMethod]        public void AsObserver_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.AsObserver<int>(default));        }        [TestMethod]        public void AsObserver_Hides()        {            var obs = new MyObserver();            var res = obs.AsObserver();            Assert.False(ReferenceEquals(obs, res));        }        [TestMethod]        public void AsObserver_Forwards()        {            var obsn = new MyObserver();            obsn.AsObserver().OnNext(42);            Assert.Equal(obsn.HasOnNext, 42);            var ex = new Exception();            var obse = new MyObserver();            obse.AsObserver().OnError(ex);            Assert.Same(ex, obse.HasOnError);            var obsc = new MyObserver();            obsc.AsObserver().OnCompleted();            Assert.True(obsc.HasOnCompleted);        }        private class MyObserver : IObserver<int>        {            public void OnNext(int value)            {                HasOnNext = value;            }            public void OnError(Exception exception)            {                HasOnError = exception;            }            public void OnCompleted()            {                HasOnCompleted = true;            }            public int HasOnNext { get; set; }            public Exception HasOnError { get; set; }            public bool HasOnCompleted { get; set; }        }        [TestMethod]        public void Observer_Checked_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Checked(default(IObserver<int>)));        }        [TestMethod]        public void Observer_Checked_AlreadyTerminated_Completed()        {            var m = 0;            var n = 0;            var o = Observer.Create<int>(_ => { m++; }, ex => { Assert.True(false); }, () => { n++; }).Checked();            o.OnNext(1);            o.OnNext(2);            o.OnCompleted();            ReactiveAssert.Throws<InvalidOperationException>(() => o.OnCompleted());            ReactiveAssert.Throws<InvalidOperationException>(() => o.OnError(new Exception()));            Assert.Equal(2, m);            Assert.Equal(1, n);        }        [TestMethod]        public void Observer_Checked_AlreadyTerminated_Error()        {            var m = 0;            var n = 0;            var o = Observer.Create<int>(_ => { m++; }, ex => { n++; }, () => { Assert.True(false); }).Checked();            o.OnNext(1);            o.OnNext(2);            o.OnError(new Exception());            ReactiveAssert.Throws<InvalidOperationException>(() => o.OnCompleted());            ReactiveAssert.Throws<InvalidOperationException>(() => o.OnError(new Exception()));            Assert.Equal(2, m);            Assert.Equal(1, n);        }        [TestMethod]        public void Observer_Checked_Reentrant_Next()        {            var n = 0;            var o = default(IObserver<int>);            o = Observer.Create<int>(                _ =>                {                    n++;                    ReactiveAssert.Throws<InvalidOperationException>(() => o.OnNext(9));                    ReactiveAssert.Throws<InvalidOperationException>(() => o.OnError(new Exception()));                    ReactiveAssert.Throws<InvalidOperationException>(() => o.OnCompleted());                },                ex =>                {                    Assert.True(false);                },                () =>                {                    Assert.True(false);                })                .Checked();            o.OnNext(1);            Assert.Equal(1, n);        }        [TestMethod]        public void Observer_Checked_Reentrant_Error()        {            var n = 0;            var o = default(IObserver<int>);            o = Observer.Create<int>(                _ =>                {                    Assert.True(false);                },                ex =>                {                    n++;                    ReactiveAssert.Throws<InvalidOperationException>(() => o.OnNext(9));                    ReactiveAssert.Throws<InvalidOperationException>(() => o.OnError(new Exception()));                    ReactiveAssert.Throws<InvalidOperationException>(() => o.OnCompleted());                },                () =>                {                    Assert.True(false);                })                .Checked();            o.OnError(new Exception());            Assert.Equal(1, n);        }        [TestMethod]        public void Observer_Checked_Reentrant_Completed()        {            var n = 0;            var o = default(IObserver<int>);            o = Observer.Create<int>(                _ =>                {                    Assert.True(false);                },                ex =>                {                    Assert.True(false);                },                () =>                {                    n++;                    ReactiveAssert.Throws<InvalidOperationException>(() => o.OnNext(9));                    ReactiveAssert.Throws<InvalidOperationException>(() => o.OnError(new Exception()));                    ReactiveAssert.Throws<InvalidOperationException>(() => o.OnCompleted());                })                .Checked();            o.OnCompleted();            Assert.Equal(1, n);        }        [TestMethod]        public void Observer_Synchronize_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Synchronize(default(IObserver<int>)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Synchronize(default(IObserver<int>), true));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Synchronize(default(IObserver<int>), new object()));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Synchronize(new MyObserver(), default(object)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Synchronize(default(IObserver<int>), new AsyncLock()));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.Synchronize(new MyObserver(), default(AsyncLock)));        }        [TestMethod]        public void Observer_Synchronize_Monitor_Reentrant1()        {            var res = false;            var inOne = false;            var s = default(IObserver<int>);            var o = Observer.Create<int>(x =>            {                if (x == 1)                {                    inOne = true;                    s.OnNext(2);                    inOne = false;                }                else if (x == 2)                {                    res = inOne;                }            });            s = Observer.Synchronize(o);            s.OnNext(1);            Assert.True(res);        }        [TestMethod]        public void Observer_Synchronize_Monitor_Reentrant2()        {            var res = false;            var inOne = false;            var s = default(IObserver<int>);            var o = Observer.Create<int>(x =>            {                if (x == 1)                {                    inOne = true;                    s.OnNext(2);                    inOne = false;                }                else if (x == 2)                {                    res = inOne;                }            });            s = Observer.Synchronize(o, new object());            s.OnNext(1);            Assert.True(res);        }        [TestMethod]        public void Observer_Synchronize_Monitor_Reentrant3()        {            var res = false;            var inOne = false;            var s = default(IObserver<int>);            var o = Observer.Create<int>(x =>            {                if (x == 1)                {                    inOne = true;                    s.OnNext(2);                    inOne = false;                }                else if (x == 2)                {                    res = inOne;                }            });            s = Observer.Synchronize(o, false);            s.OnNext(1);            Assert.True(res);        }        [TestMethod]        public void Observer_Synchronize_AsyncLock_NonReentrant1()        {            var res = false;            var inOne = false;            var s = default(IObserver<int>);            var o = Observer.Create<int>(x =>            {                if (x == 1)                {                    inOne = true;                    s.OnNext(2);                    inOne = false;                }                else if (x == 2)                {                    res = !inOne;                }            });            s = Observer.Synchronize(o, new AsyncLock());            s.OnNext(1);            Assert.True(res);        }        [TestMethod]        public void Observer_Synchronize_AsyncLock_NonReentrant2()        {            var res = false;            var inOne = false;            var s = default(IObserver<int>);            var o = Observer.Create<int>(x =>            {                if (x == 1)                {                    inOne = true;                    s.OnNext(2);                    inOne = false;                }                else if (x == 2)                {                    res = !inOne;                }            });            s = Observer.Synchronize(o, true);            s.OnNext(1);            Assert.True(res);        }        [TestMethod]        public void Observer_Synchronize_AsyncLock()        {            {                var res = false;                var s = default(IObserver<int>);                var o = Observer.Create<int>(                    x => { res = x == 1; },                    ex => { Assert.True(false); },                    () => { Assert.True(false); }                );                s = Observer.Synchronize(o, true);                s.OnNext(1);                Assert.True(res);            }            {                var res = default(Exception);                var err = new Exception();                var s = default(IObserver<int>);                var o = Observer.Create<int>(                    x => { Assert.True(false); },                    ex => { res = ex; },                    () => { Assert.True(false); }                );                s = Observer.Synchronize(o, true);                s.OnError(err);                Assert.Same(err, res);            }            {                var res = false;                var s = default(IObserver<int>);                var o = Observer.Create<int>(                    x => { Assert.True(false); },                    ex => { Assert.True(false); },                    () => { res = true; }                );                s = Observer.Synchronize(o, true);                s.OnCompleted();                Assert.True(res);            }        }        [TestMethod]        public void Observer_Synchronize_OnCompleted()        {            Observer_Synchronize(true);        }        [TestMethod]        public void Observer_Synchronize_OnError()        {            Observer_Synchronize(false);        }        private void Observer_Synchronize(bool success)        {            var busy = false;            var n = 0;            var ex = default(Exception);            var done = false;            var o = Observer.Create<int>(                _ =>                {                    Assert.False(busy);                    busy = true;                    n++;                    busy = false;                },                _ =>                {                    Assert.False(busy);                    busy = true;                    ex = _;                    busy = false;                },                () =>                {                    Assert.False(busy);                    busy = true;                    done = true;                    busy = false;                }            );            var s = Observer.Synchronize(o);            var N = 10;            var M = 1000;            var e = new CountdownEvent(N);            for (var i = 0; i < N; i++)            {                Scheduler.Default.Schedule(() =>                {                    for (var j = 0; j < M; j++)                    {                        s.OnNext(j);                    }                    e.Signal();                });            }            e.Wait();            if (success)            {                s.OnCompleted();                Assert.True(done);            }            else            {                var err = new Exception();                s.OnError(err);                Assert.Same(err, ex);            }            Assert.Equal(n, N * M);        }        [TestMethod]        public void NotifyOn_Null()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.NotifyOn(default(IObserver<int>), Scheduler.Immediate));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.NotifyOn(new MyObserver(), default(IScheduler)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.NotifyOn(default(IObserver<int>), new MySyncCtx()));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.NotifyOn(new MyObserver(), default(SynchronizationContext)));        }        [TestMethod]        public void NotifyOn_Scheduler_OnCompleted()        {            NotifyOn_Scheduler(true);        }        [TestMethod]        public void NotifyOn_Scheduler_OnError()        {            NotifyOn_Scheduler(false);        }        private void NotifyOn_Scheduler(bool success)        {            var e = new ManualResetEvent(false);            var c = 0;            var N = 100;            var err = new Exception();            var o = Observer.Create<int>(                x =>                {                    c++;#if DESKTOPCLR                    Assert.True(Thread.CurrentThread.IsThreadPoolThread);#endif                },                ex =>                {                    Assert.Same(err, ex);                    e.Set();                },                () =>                {#if DESKTOPCLR                    Assert.True(Thread.CurrentThread.IsThreadPoolThread);#endif                    e.Set();                }            );            var s = ThreadPoolScheduler.Instance.DisableOptimizations(/* long-running creates a non-threadpool thread */);            var n = Observer.NotifyOn(o, s);            new Thread(() =>            {                for (var i = 0; i < N; i++)                {                    n.OnNext(i);                }                if (success)                {                    n.OnCompleted();                }                else                {                    n.OnError(err);                }            }).Start();            e.WaitOne();            Assert.Equal(N, c);        }        [TestMethod]        public void NotifyOn_SyncCtx()        {            var lst = new List<int>();            var don = new ManualResetEvent(false);            var obs = Observer.Create<int>(x => { lst.Add(x); }, ex => { Assert.True(false); }, () => { don.Set(); });            var ctx = new MySyncCtx();            var res = obs.NotifyOn(ctx);            for (var i = 0; i < 100; i++)            {                obs.OnNext(i);            }            obs.OnCompleted();            don.WaitOne();            Assert.True(lst.SequenceEqual(Enumerable.Range(0, 100)));        }        private class MySyncCtx : SynchronizationContext        {            public override void Post(SendOrPostCallback d, object state)            {                Task.Run(() => d(state));            }        }        [TestMethod]        public void Observer_ToProgress_ArgumentChecking()        {            var s = Scheduler.Immediate;            var o = Observer.Create<int>(_ => { });            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.ToProgress<int>(default));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.ToProgress<int>(default, s));            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.ToProgress(o, default));        }        [TestMethod]        public void Observer_ToProgress()        {            var xs = new List<int>();            var p = Observer.Create<int>(xs.Add).ToProgress();            p.Report(42);            p.Report(43);            Assert.True(xs.SequenceEqual(new[] { 42, 43 }));        }        [TestMethod]        public void Observer_ToProgress_Scheduler()        {            var s = new TestScheduler();            var o = s.CreateObserver<int>();            var p = o.ToProgress(s);            s.ScheduleAbsolute(200, () =>            {                p.Report(42);                p.Report(43);            });            s.Start();            o.Messages.AssertEqual(                OnNext(201, 42),                OnNext(202, 43)            );        }        [TestMethod]        public void Progress_ToObserver_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observer.ToObserver(default(IProgress<int>)));        }        [TestMethod]        public void Progress_ToObserver()        {            var xs = new List<int>();            var p = new MyProgress<int>(xs.Add);            var o = p.ToObserver();            o.OnNext(42);            o.OnNext(43);            Assert.True(xs.SequenceEqual(new[] { 42, 43 }));        }        private class MyProgress<T> : IProgress<T>        {            private readonly Action<T> _report;            public MyProgress(Action<T> report)            {                _report = report;            }            public void Report(T value)            {                _report(value);            }        }    }}
 |