ReplaySubject.cs 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #if STRESS
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.Linq;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using System.Threading.Tasks;
  12. namespace ReactiveTests.Stress.Linq
  13. {
  14. public class ReplaySubject
  15. {
  16. /// <summary>
  17. /// Tests the ReplaySubject with concurrent subscribers.
  18. /// </summary>
  19. public static void ConcurrentSubscribers()
  20. {
  21. var N = int.MaxValue;
  22. var M = int.MaxValue;
  23. var r = new ReplaySubject<int>(4);
  24. var rnd = new Random();
  25. var ts = new List<Task>();
  26. for (var i = 0; i < 16; i++)
  27. {
  28. var rnd2 = new Random(rnd.Next());
  29. ts.Add(Task.Factory.StartNew(async () =>
  30. {
  31. var n = rnd2.Next(10, 1000);
  32. for (var j = 0; j < M; j++)
  33. {
  34. var xs = new List<int>();
  35. await r.Take(n).Scan((x1, x2) =>
  36. {
  37. if (x2 - x1 != 1)
  38. Debugger.Break();
  39. if (x2 == 0)
  40. Debugger.Break();
  41. return x2;
  42. }).ForEachAsync(xs.Add);
  43. var f = xs.First();
  44. if (!xs.SequenceEqual(Enumerable.Range(f, xs.Count)))
  45. {
  46. Console.WriteLine("FAIL!");
  47. Debugger.Break();
  48. }
  49. else
  50. {
  51. Console.Write(".");
  52. }
  53. if (j % 1000 == 0)
  54. {
  55. await Task.Delay(50);
  56. }
  57. }
  58. }));
  59. }
  60. for (var i = 0; i < N; i++)
  61. {
  62. r.OnNext(i);
  63. }
  64. Console.WriteLine("Done!");
  65. Task.WaitAll(ts.ToArray());
  66. }
  67. }
  68. }
  69. #endif