ReplaySubject.cs 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if STRESS
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Linq;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using System.Threading.Tasks;
  10. namespace ReactiveTests.Stress.Linq
  11. {
  12. public class ReplaySubject
  13. {
  14. /// <summary>
  15. /// Tests the ReplaySubject with concurrent subscribers.
  16. /// </summary>
  17. public static void ConcurrentSubscribers()
  18. {
  19. var N = int.MaxValue;
  20. var M = int.MaxValue;
  21. var r = new ReplaySubject<int>(4);
  22. var rnd = new Random();
  23. var ts = new List<Task>();
  24. for (var i = 0; i < 16; i++)
  25. {
  26. var rnd2 = new Random(rnd.Next());
  27. ts.Add(Task.Run(async () =>
  28. {
  29. var n = rnd2.Next(10, 1000);
  30. for (var j = 0; j < M; j++)
  31. {
  32. var xs = new List<int>();
  33. await r.Take(n).Scan((x1, x2) =>
  34. {
  35. if (x2 - x1 != 1)
  36. Debugger.Break();
  37. if (x2 == 0)
  38. Debugger.Break();
  39. return x2;
  40. }).ForEachAsync(xs.Add);
  41. var f = xs.First();
  42. if (!xs.SequenceEqual(Enumerable.Range(f, xs.Count)))
  43. {
  44. Console.WriteLine("FAIL!");
  45. Debugger.Break();
  46. }
  47. else
  48. {
  49. Console.Write(".");
  50. }
  51. if (j % 1000 == 0)
  52. {
  53. await Task.Delay(50);
  54. }
  55. }
  56. }));
  57. }
  58. for (var i = 0; i < N; i++)
  59. {
  60. r.OnNext(i);
  61. }
  62. Console.WriteLine("Done!");
  63. Task.WaitAll(ts.ToArray());
  64. }
  65. }
  66. }
  67. #endif