123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System;
- using System.Linq;
- using System.Reactive.Concurrency;
- using System.Reactive.Linq;
- using System.Threading;
- using Microsoft.Reactive.Testing;
- using Microsoft.VisualStudio.TestTools.UnitTesting;
- using Assert = Xunit.Assert;
- namespace ReactiveTests.Tests
- {
- [TestClass]
- public class SynchronizeTest : TestBase
- {
- [TestMethod]
- public void Synchronize_ArgumentChecking()
- {
- var someObservable = Observable.Empty<int>();
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default, new object()));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize(someObservable, null));
- }
- [TestMethod]
- public void Synchronize_Range()
- {
- var i = 0;
- var outsideLock = true;
- var gate = new object();
- lock (gate)
- {
- outsideLock = false;
- Observable.Range(0, 100, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => i++, () => { Assert.True(outsideLock); });
- Thread.Sleep(100);
- Assert.Equal(0, i);
- outsideLock = true;
- }
- while (i < 100)
- {
- Thread.Sleep(10);
- lock (gate)
- {
- var start = i;
- Thread.Sleep(100);
- Assert.Equal(start, i);
- }
- }
- }
- [TestMethod]
- public void Synchronize_Throw()
- {
- var ex = new Exception();
- var resLock = new object();
- var e = default(Exception);
- var outsideLock = true;
- var gate = new object();
- lock (gate)
- {
- outsideLock = false;
- Observable.Throw<int>(ex, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => { Assert.True(false); }, err => { lock (resLock) { e = err; } }, () => { Assert.True(outsideLock); });
- Thread.Sleep(100);
- Assert.Null(e);
- outsideLock = true;
- }
- while (true)
- {
- lock (resLock)
- {
- if (e != null)
- {
- break;
- }
- }
- }
- Assert.Same(ex, e);
- }
- [TestMethod]
- public void Synchronize_BadObservable()
- {
- var o = Observable.Create<int>(obs =>
- {
- var t1 = new Thread(() =>
- {
- for (var i = 0; i < 100; i++)
- {
- obs.OnNext(i);
- }
- });
- new Thread(() =>
- {
- t1.Start();
- for (var i = 100; i < 200; i++)
- {
- obs.OnNext(i);
- }
- t1.Join();
- obs.OnCompleted();
- }).Start();
- return () => { };
- });
- var evt = new ManualResetEvent(false);
- var sum = 0;
- o.Synchronize().Subscribe(x => sum += x, () => { evt.Set(); });
- evt.WaitOne();
- Assert.Equal(Enumerable.Range(0, 200).Sum(), sum);
- }
- }
- }
|