// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
#if STRESS
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace ReactiveTests.Stress.Linq
{
public class ReplaySubject
{
///
/// Tests the ReplaySubject with concurrent subscribers.
///
public static void ConcurrentSubscribers()
{
var N = int.MaxValue;
var M = int.MaxValue;
var r = new ReplaySubject(4);
var rnd = new Random();
var ts = new List();
for (var i = 0; i < 16; i++)
{
var rnd2 = new Random(rnd.Next());
ts.Add(Task.Factory.StartNew(async () =>
{
var n = rnd2.Next(10, 1000);
for (var j = 0; j < M; j++)
{
var xs = new List();
await r.Take(n).Scan((x1, x2) =>
{
if (x2 - x1 != 1)
Debugger.Break();
if (x2 == 0)
Debugger.Break();
return x2;
}).ForEachAsync(xs.Add);
var f = xs.First();
if (!xs.SequenceEqual(Enumerable.Range(f, xs.Count)))
{
Console.WriteLine("FAIL!");
Debugger.Break();
}
else
{
Console.Write(".");
}
if (j % 1000 == 0)
{
await Task.Delay(50);
}
}
}));
}
for (var i = 0; i < N; i++)
{
r.OnNext(i);
}
Console.WriteLine("Done!");
Task.WaitAll(ts.ToArray());
}
}
}
#endif