SynchronizeTest.cs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. namespace ReactiveTests.Tests
  18. {
  19. public class SynchronizeTest : TestBase
  20. {
  21. [Fact]
  22. public void Synchronize_ArgumentChecking()
  23. {
  24. var someObservable = Observable.Empty<int>();
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default(IObservable<int>)));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default(IObservable<int>), new object()));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(someObservable, null));
  28. }
  29. #if !NO_THREAD
  30. [Fact]
  31. public void Synchronize_Range()
  32. {
  33. int i = 0;
  34. bool outsideLock = true;
  35. var gate = new object();
  36. lock (gate)
  37. {
  38. outsideLock = false;
  39. Observable.Range(0, 100, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => i++, () => { Assert.True(outsideLock); });
  40. Thread.Sleep(100);
  41. Assert.Equal(0, i);
  42. outsideLock = true;
  43. }
  44. while (i < 100)
  45. {
  46. Thread.Sleep(10);
  47. lock (gate)
  48. {
  49. int start = i;
  50. Thread.Sleep(100);
  51. Assert.Equal(start, i);
  52. }
  53. }
  54. }
  55. [Fact]
  56. public void Synchronize_Throw()
  57. {
  58. var ex = new Exception();
  59. var resLock = new object();
  60. var e = default(Exception);
  61. bool outsideLock = true;
  62. var gate = new object();
  63. lock (gate)
  64. {
  65. outsideLock = false;
  66. Observable.Throw<int>(ex, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => { Assert.True(false); }, err => { lock (resLock) { e = err; } }, () => { Assert.True(outsideLock); });
  67. Thread.Sleep(100);
  68. Assert.Null(e);
  69. outsideLock = true;
  70. }
  71. while (true)
  72. {
  73. lock (resLock)
  74. {
  75. if (e != null)
  76. break;
  77. }
  78. }
  79. Assert.Same(ex, e);
  80. }
  81. [Fact]
  82. public void Synchronize_BadObservable()
  83. {
  84. var o = Observable.Create<int>(obs =>
  85. {
  86. var t1 = new Thread(() =>
  87. {
  88. for (int i = 0; i < 100; i++)
  89. {
  90. obs.OnNext(i);
  91. }
  92. });
  93. new Thread(() =>
  94. {
  95. t1.Start();
  96. for (int i = 100; i < 200; i++)
  97. {
  98. obs.OnNext(i);
  99. }
  100. t1.Join();
  101. obs.OnCompleted();
  102. }).Start();
  103. return () => { };
  104. });
  105. var evt = new ManualResetEvent(false);
  106. int sum = 0;
  107. o.Synchronize().Subscribe(x => sum += x, () => { evt.Set(); });
  108. evt.WaitOne();
  109. Assert.Equal(Enumerable.Range(0, 200).Sum(), sum);
  110. }
  111. #endif
  112. }
  113. }