SynchronizeTest.cs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Threading;
  9. using Microsoft.Reactive.Testing;
  10. using Xunit;
  11. namespace ReactiveTests.Tests
  12. {
  13. public class SynchronizeTest : TestBase
  14. {
  15. [Fact]
  16. public void Synchronize_ArgumentChecking()
  17. {
  18. var someObservable = Observable.Empty<int>();
  19. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default));
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default, new object()));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize(someObservable, null));
  22. }
  23. #if !NO_THREAD
  24. [Fact]
  25. public void Synchronize_Range()
  26. {
  27. var i = 0;
  28. var outsideLock = true;
  29. var gate = new object();
  30. lock (gate)
  31. {
  32. outsideLock = false;
  33. Observable.Range(0, 100, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => i++, () => { Assert.True(outsideLock); });
  34. Thread.Sleep(100);
  35. Assert.Equal(0, i);
  36. outsideLock = true;
  37. }
  38. while (i < 100)
  39. {
  40. Thread.Sleep(10);
  41. lock (gate)
  42. {
  43. var start = i;
  44. Thread.Sleep(100);
  45. Assert.Equal(start, i);
  46. }
  47. }
  48. }
  49. [Fact]
  50. public void Synchronize_Throw()
  51. {
  52. var ex = new Exception();
  53. var resLock = new object();
  54. var e = default(Exception);
  55. var outsideLock = true;
  56. var gate = new object();
  57. lock (gate)
  58. {
  59. outsideLock = false;
  60. Observable.Throw<int>(ex, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => { Assert.True(false); }, err => { lock (resLock) { e = err; } }, () => { Assert.True(outsideLock); });
  61. Thread.Sleep(100);
  62. Assert.Null(e);
  63. outsideLock = true;
  64. }
  65. while (true)
  66. {
  67. lock (resLock)
  68. {
  69. if (e != null)
  70. {
  71. break;
  72. }
  73. }
  74. }
  75. Assert.Same(ex, e);
  76. }
  77. [Fact]
  78. public void Synchronize_BadObservable()
  79. {
  80. var o = Observable.Create<int>(obs =>
  81. {
  82. var t1 = new Thread(() =>
  83. {
  84. for (var i = 0; i < 100; i++)
  85. {
  86. obs.OnNext(i);
  87. }
  88. });
  89. new Thread(() =>
  90. {
  91. t1.Start();
  92. for (var i = 100; i < 200; i++)
  93. {
  94. obs.OnNext(i);
  95. }
  96. t1.Join();
  97. obs.OnCompleted();
  98. }).Start();
  99. return () => { };
  100. });
  101. var evt = new ManualResetEvent(false);
  102. var sum = 0;
  103. o.Synchronize().Subscribe(x => sum += x, () => { evt.Set(); });
  104. evt.WaitOne();
  105. Assert.Equal(Enumerable.Range(0, 200).Sum(), sum);
  106. }
  107. #endif
  108. }
  109. }