| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the MIT License.// See the LICENSE file in the project root for more information. using System;using System.Collections.Generic;using System.Reactive;using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Reactive.Linq;using System.Reactive.Subjects;using Microsoft.Reactive.Testing;using Microsoft.VisualStudio.TestTools.UnitTesting;using Assert = Xunit.Assert;namespace ReactiveTests.Tests{    [TestClass]    public class RefCountTest : ReactiveTest    {        private sealed class DematerializingConnectableObservable<T> : IConnectableObservable<T>        {            private readonly IConnectableObservable<Notification<T>> _subject;            public DematerializingConnectableObservable(IConnectableObservable<Notification<T>> subject)            {                _subject = subject;            }            public IDisposable Subscribe(IObserver<T> observer)            {                return _subject.Dematerialize().Subscribe(observer);            }            public IDisposable Connect()            {                return _subject.Connect();            }        }        [TestMethod]        public void RefCount_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, 2));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 0));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -1));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -2));        }        [TestMethod]        public void RefCount_ConnectsOnFirst()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(210, 1),                OnNext(220, 2),                OnNext(230, 3),                OnNext(240, 4),                OnCompleted<int>(250)            );            var subject = new MySubject();            var conn = new ConnectableObservable<int>(xs, subject);            var res = scheduler.Start(() =>                conn.RefCount()            );            res.Messages.AssertEqual(                OnNext(210, 1),                OnNext(220, 2),                OnNext(230, 3),                OnNext(240, 4),                OnCompleted<int>(250)            );            Assert.True(subject.Disposed);        }        [TestMethod]        public void RefCount_NotConnected()        {            var disconnected = false;            var count = 0;            var xs = Observable.Defer(() =>            {                count++;                return Observable.Create<int>(obs =>                {                    return () => { disconnected = true; };                });            });            var subject = new MySubject();            var conn = new ConnectableObservable<int>(xs, subject);            var refd = conn.RefCount();            var dis1 = refd.Subscribe();            Assert.Equal(1, count);            Assert.Equal(1, subject.SubscribeCount);            Assert.False(disconnected);            var dis2 = refd.Subscribe();            Assert.Equal(1, count);            Assert.Equal(2, subject.SubscribeCount);            Assert.False(disconnected);            dis1.Dispose();            Assert.False(disconnected);            dis2.Dispose();            Assert.True(disconnected);            disconnected = false;            var dis3 = refd.Subscribe();            Assert.Equal(2, count);            Assert.Equal(3, subject.SubscribeCount);            Assert.False(disconnected);            dis3.Dispose();            Assert.True(disconnected);        }        [TestMethod]        public void RefCount_OnError()        {            var ex = new Exception();            var xs = Observable.Throw<int>(ex, Scheduler.Immediate);            var res = xs.Publish().RefCount();            res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });            res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });        }        [TestMethod]        public void RefCount_Publish()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(210, 1),                OnNext(220, 2),                OnNext(230, 3),                OnNext(240, 4),                OnNext(250, 5),                OnNext(260, 6),                OnNext(270, 7),                OnNext(280, 8),                OnNext(290, 9),                OnCompleted<int>(300)            );            var res = xs.Publish().RefCount();            var d1 = default(IDisposable);            var o1 = scheduler.CreateObserver<int>();            scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });            scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });            var d2 = default(IDisposable);            var o2 = scheduler.CreateObserver<int>();            scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });            scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); });            var d3 = default(IDisposable);            var o3 = scheduler.CreateObserver<int>();            scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });            scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });            var d4 = default(IDisposable);            var o4 = scheduler.CreateObserver<int>();            scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });            scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });            scheduler.Start();            o1.Messages.AssertEqual(                OnNext(220, 2),                OnNext(230, 3)            );            o2.Messages.AssertEqual(                OnNext(230, 3),                OnNext(240, 4),                OnNext(250, 5),                OnNext(260, 6),                OnNext(270, 7)            );            o3.Messages.AssertEqual(                OnNext(260, 6)            );            o4.Messages.AssertEqual(                OnNext(290, 9),                OnCompleted<int>(300)            );            xs.Subscriptions.AssertEqual(                Subscribe(215, 275),                Subscribe(285, 300)            );        }        [TestMethod]        public void RefCount_can_connect_again_if_previous_subscription_terminated_synchronously()        {            var seen = 0;            var terminated = false;            var subject = new ReplaySubject<Notification<int>>(1);            var connectable = new DematerializingConnectableObservable<int>(subject.Publish());            var refCount = connectable.RefCount();            subject.OnNext(Notification.CreateOnNext(36));            using (refCount.Subscribe(value => seen = value, () => terminated = true))            {                Assert.Equal(36, seen);            }            seen = 0;            terminated = false;            subject.OnNext(Notification.CreateOnCompleted<int>());            using (refCount.Subscribe(value => seen = value, () => terminated = true))            {                Assert.Equal(0, seen);                Assert.True(terminated);            }            seen = 0;            terminated = false;            subject.OnNext(Notification.CreateOnNext(36));            using (refCount.Subscribe(value => seen = value, () => terminated = true))            {                Assert.Equal(36, seen);                Assert.False(terminated);            }        }                [TestMethod]        public void LazyRefCount_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, TimeSpan.FromSeconds(2)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, TimeSpan.FromSeconds(2), Scheduler.Default));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, 2, TimeSpan.FromSeconds(2)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, 2, TimeSpan.FromSeconds(2)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount(Observable.Never<int>().Publish(), TimeSpan.FromSeconds(2), null));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 0, TimeSpan.FromSeconds(2)));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -1, TimeSpan.FromSeconds(2)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 2, TimeSpan.FromSeconds(2), null));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 0, TimeSpan.FromSeconds(2), Scheduler.Default));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -1, TimeSpan.FromSeconds(2), Scheduler.Default));        }        [TestMethod]        public void LazyRefCount_ConnectsOnFirst()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(210, 1),                OnNext(220, 2),                OnNext(230, 3),                OnNext(240, 4),                OnCompleted<int>(250)            );            var subject = new MySubject();            var conn = new ConnectableObservable<int>(xs, subject);            var res = scheduler.Start(() =>                conn.RefCount(TimeSpan.FromSeconds(2))            );            res.Messages.AssertEqual(                OnNext(210, 1),                OnNext(220, 2),                OnNext(230, 3),                OnNext(240, 4),                OnCompleted<int>(250)            );            Assert.True(subject.Disposed);        }        [TestMethod]        public void LazyRefCount_NotConnected()        {            var scheduler = new TestScheduler();            var disconnected = false;            var count = 0;            var xs = Observable.Defer(() =>            {                count++;                return Observable.Create<int>(obs =>                {                    return () => { disconnected = true; };                });            });            var subject = new MySubject();            var conn = new ConnectableObservable<int>(xs, subject);            var refd = conn.RefCount(TimeSpan.FromTicks(20), scheduler);            var dis1 = refd.Subscribe();            Assert.Equal(1, count);            Assert.Equal(1, subject.SubscribeCount);            Assert.False(disconnected);            var dis2 = refd.Subscribe();            Assert.Equal(1, count);            Assert.Equal(2, subject.SubscribeCount);            Assert.False(disconnected);            dis1.Dispose();            Assert.False(disconnected);            dis2.Dispose();            Assert.False(disconnected);            scheduler.AdvanceBy(19);            Assert.False(disconnected);            scheduler.AdvanceBy(1);            Assert.True(disconnected);            disconnected = false;            var dis3 = refd.Subscribe();            Assert.Equal(2, count);            Assert.Equal(3, subject.SubscribeCount);            Assert.False(disconnected);            dis3.Dispose();            scheduler.AdvanceBy(20);            Assert.True(disconnected);        }        [TestMethod]        public void LazyRefCount_OnError()        {            var ex = new Exception();            var xs = Observable.Throw<int>(ex, Scheduler.Immediate);            var res = xs.Publish().RefCount(TimeSpan.FromSeconds(2));            res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());            res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());        }        [TestMethod]        public void LazyRefCount_Publish()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(210, 1),                OnNext(220, 2),                OnNext(230, 3),                OnNext(240, 4),                OnNext(250, 5),                OnNext(260, 6),                OnNext(270, 7),                OnNext(280, 8),                OnNext(290, 9),                OnCompleted<int>(300)            );            var res = xs.Publish().RefCount(TimeSpan.FromTicks(9), scheduler);            var d1 = default(IDisposable);            var o1 = scheduler.CreateObserver<int>();            scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });            scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });            var d2 = default(IDisposable);            var o2 = scheduler.CreateObserver<int>();            scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });            scheduler.ScheduleAbsolute(275, () =>            {                d2.Dispose();            });            var d3 = default(IDisposable);            var o3 = scheduler.CreateObserver<int>();            scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });            scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });            var d4 = default(IDisposable);            var o4 = scheduler.CreateObserver<int>();            scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });            scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });            scheduler.Start();            o1.Messages.AssertEqual(                OnNext(220, 2),                OnNext(230, 3)            );            o2.Messages.AssertEqual(                OnNext(230, 3),                OnNext(240, 4),                OnNext(250, 5),                OnNext(260, 6),                OnNext(270, 7)            );            o3.Messages.AssertEqual(                OnNext(260, 6)            );            o4.Messages.AssertEqual(                OnNext(290, 9),                OnCompleted<int>(300)            );            xs.Subscriptions.AssertEqual(                Subscribe(215, 284),                Subscribe(285, 300)            );        }        [TestMethod]        public void RefCount_source_already_completed_synchronously()        {            var subscribed = 0;            var unsubscribed = 0;                        var o1 = Observable.Create<string>(observer =>            {                subscribed++;                observer.OnCompleted();                return Disposable.Create(() => unsubscribed++);            });            var o2 = o1.Publish().RefCount();            var s1 = o2.Subscribe();            Assert.Equal(1, subscribed);            Assert.Equal(1, unsubscribed);            var s2 = o2.Subscribe();            Assert.Equal(1, subscribed);            Assert.Equal(1, unsubscribed);        }        [TestMethod]        public void RefCount_minObservers_not_connected_Eager()        {            var connected = 0;            var source = Observable.Defer(() =>            {                connected++;                return Observable.Never<int>();            })            .Publish()            .RefCount(2);            Assert.Equal(0, connected);            source.Subscribe();            Assert.Equal(0, connected);        }        [TestMethod]        public void RefCount_minObservers_connected_Eager()        {            var connected = 0;            var source = Observable.Defer(() =>            {                connected++;                return Observable.Range(1, 5);            })            .Publish()            .RefCount(2);            Assert.Equal(0, connected);            var list1 = new List<int>();            source.Subscribe(list1.Add);            Assert.Equal(0, connected);            Assert.Empty(list1);            var list2 = new List<int>();            source.Subscribe(list2.Add);            Assert.Equal(1, connected);            List<int> expected = [1, 2, 3, 4, 5];            Assert.Equal(expected, list1);            Assert.Equal(expected, list2);        }        [TestMethod]        public void RefCount_minObservers_not_connected_Lazy()        {            var connected = 0;            var source = Observable.Defer(() =>            {                connected++;                return Observable.Never<int>();            })            .Publish()            .RefCount(2, TimeSpan.FromMinutes(1));            Assert.Equal(0, connected);            source.Subscribe();            Assert.Equal(0, connected);        }        [TestMethod]        public void RefCount_minObservers_connected_Lazy()        {            var connected = 0;            var source = Observable.Defer(() =>            {                connected++;                return Observable.Range(1, 5);            })            .Publish()            .RefCount(2, TimeSpan.FromMinutes(1));            Assert.Equal(0, connected);            var list1 = new List<int>();            source.Subscribe(list1.Add);            Assert.Equal(0, connected);            Assert.Empty(list1);            var list2 = new List<int>();            source.Subscribe(list2.Add);            Assert.Equal(1, connected);            var expected = new List<int>([1, 2, 3, 4, 5]);            Assert.Equal(expected, list1);            Assert.Equal(expected, list2);        }    }}
 |