| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- #if STRESS
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Reactive.Linq;
- using System.Reactive.Subjects;
- using System.Threading.Tasks;
- namespace ReactiveTests.Stress.Linq
- {
- public class ReplaySubject
- {
- /// <summary>
- /// Tests the ReplaySubject with concurrent subscribers.
- /// </summary>
- public static void ConcurrentSubscribers()
- {
- var N = int.MaxValue;
- var M = int.MaxValue;
- var r = new ReplaySubject<int>(4);
- var rnd = new Random();
- var ts = new List<Task>();
- for (var i = 0; i < 16; i++)
- {
- var rnd2 = new Random(rnd.Next());
- ts.Add(Task.Factory.StartNew(async () =>
- {
- var n = rnd2.Next(10, 1000);
- for (var j = 0; j < M; j++)
- {
- var xs = new List<int>();
- await r.Take(n).Scan((x1, x2) =>
- {
- if (x2 - x1 != 1)
- Debugger.Break();
- if (x2 == 0)
- Debugger.Break();
- return x2;
- }).ForEachAsync(xs.Add);
- var f = xs.First();
- if (!xs.SequenceEqual(Enumerable.Range(f, xs.Count)))
- {
- Console.WriteLine("FAIL!");
- Debugger.Break();
- }
- else
- {
- Console.Write(".");
- }
- if (j % 1000 == 0)
- {
- await Task.Delay(50);
- }
- }
- }));
- }
- for (var i = 0; i < N; i++)
- {
- r.OnNext(i);
- }
- Console.WriteLine("Done!");
- Task.WaitAll(ts.ToArray());
- }
- }
- }
- #endif
|