1
0

FromEvent.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if STRESS
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Linq;
  8. using System.Reflection;
  9. using System.Threading;
  10. namespace ReactiveTests.Stress.Linq
  11. {
  12. public class FromEvent
  13. {
  14. private static Lazy<Random> s_rand = new Lazy<Random>();
  15. /// <summary>
  16. /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions.
  17. /// While this is going on, one consumer does not want to be disturbed while receiving the sequence.
  18. ///
  19. /// Runs a set of combinations of the RefCount_* tests.
  20. /// </summary>
  21. public static void RefCount_Mix()
  22. {
  23. Console.Title = MethodInfo.GetCurrentMethod().Name + " - 0% complete";
  24. for (int i = 1; i <= 100; i++)
  25. {
  26. var repeatCount = 10;
  27. foreach (var msgCount in new[] { 100, 1000, 10000, 100000 })
  28. {
  29. // concurrency level {10, 20, ..., 100}
  30. RefCount_ConcurrencyLevel_Linear(msgCount, repeatCount, 10, 100, 10);
  31. // concurrency level {100, 200, ..., 1000}
  32. RefCount_ConcurrencyLevel_Linear(msgCount, repeatCount, 100, 1000, 100);
  33. // concurrency level {1, 2, 4, ..., 65536}
  34. RefCount_ConcurrencyLevel_Exponential(msgCount, repeatCount, 1, 65536, 2);
  35. }
  36. foreach (var maxMsgCount in new[] { 10, 100, 1000, 10000, 100000 })
  37. {
  38. foreach (var maxConcurrency in new[] { 10, 100, 1000, 10000, 100000 })
  39. {
  40. RefCount_Rand(repeatCount, maxMsgCount, maxConcurrency);
  41. }
  42. }
  43. Console.Title = MethodInfo.GetCurrentMethod().Name + " - " + i + "% complete";
  44. }
  45. }
  46. /// <summary>
  47. /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions.
  48. /// While this is going on, one consumer does not want to be disturbed while receiving the sequence.
  49. /// Subscriptions are happening on the ThreadPool, possibly causing (expected) time gaps.
  50. ///
  51. /// Runs a set of combinations of the RefCount_* tests.
  52. /// </summary>
  53. public static void RefCountWithPost_Mix()
  54. {
  55. Console.Title = MethodInfo.GetCurrentMethod().Name + " - 0% complete";
  56. for (int i = 1; i <= 100; i++)
  57. {
  58. var repeatCount = 10;
  59. foreach (var msgCount in new[] { 100, 1000, 10000, 100000 })
  60. {
  61. // concurrency level {10, 20, ..., 100}
  62. RefCountWithPost_ConcurrencyLevel_Linear(msgCount, repeatCount, 10, 100, 10);
  63. // concurrency level {100, 200, ..., 1000}
  64. RefCountWithPost_ConcurrencyLevel_Linear(msgCount, repeatCount, 100, 1000, 100);
  65. // concurrency level {1, 2, 4, ..., 65536}
  66. RefCountWithPost_ConcurrencyLevel_Exponential(msgCount, repeatCount, 1, 65536, 2);
  67. }
  68. foreach (var maxMsgCount in new[] { 10, 100, 1000, 10000, 100000 })
  69. {
  70. foreach (var maxConcurrency in new[] { 10, 100, 1000, 10000, 100000 })
  71. {
  72. RefCountWithPost_Rand(repeatCount, maxMsgCount, maxConcurrency);
  73. }
  74. }
  75. Console.Title = MethodInfo.GetCurrentMethod().Name + " - " + i + "% complete";
  76. }
  77. }
  78. /// <summary>
  79. /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions.
  80. /// While this is going on, one consumer does not want to be disturbed while receiving the sequence.
  81. ///
  82. /// Uses random parameters for the number of messages and the level of concurrency.
  83. /// </summary>
  84. /// <param name="n">Number of iterations.</param>
  85. /// <param name="maxN">Maximum number of message.</param>
  86. /// <param name="maxM">Maximum level of concurrency.</param>
  87. public static void RefCount_Rand(int n, int maxN, int maxM)
  88. {
  89. RefCount_(RefCount_Rand_Params(n, maxN, maxM));
  90. }
  91. /// <summary>
  92. /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions.
  93. /// While this is going on, one consumer does not want to be disturbed while receiving the sequence.
  94. /// Subscriptions are happening on the ThreadPool, possibly causing (expected) time gaps.
  95. ///
  96. /// Uses random parameters for the number of messages and the level of concurrency.
  97. /// </summary>
  98. /// <param name="n">Number of iterations.</param>
  99. /// <param name="maxN">Maximum number of message.</param>
  100. /// <param name="maxM">Maximum level of concurrency.</param>
  101. public static void RefCountWithPost_Rand(int n, int maxN, int maxM)
  102. {
  103. RefCountWithPost_(RefCount_Rand_Params(n, maxN, maxM));
  104. }
  105. private static IEnumerable<Tuple<int, int>> RefCount_Rand_Params(int n, int maxN, int maxM)
  106. {
  107. for (int i = 0; i < n; i++)
  108. {
  109. var N = s_rand.Value.Next(1, maxN);
  110. var M = s_rand.Value.Next(1, maxM);
  111. yield return new Tuple<int, int>(N, M);
  112. }
  113. }
  114. /// <summary>
  115. /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions.
  116. /// While this is going on, one consumer does not want to be disturbed while receiving the sequence.
  117. ///
  118. /// Uses linear increments for the concurrency level.
  119. /// </summary>
  120. /// <param name="N">Number of messages.</param>
  121. /// <param name="n">Number of iterations.</param>
  122. /// <param name="min">Minimum level of concurrency.</param>
  123. /// <param name="max">Maximum level of concurrency.</param>
  124. /// <param name="step">Additive step size to increase level of concurrency.</param>
  125. public static void RefCount_ConcurrencyLevel_Linear(int N, int n, int min, int max, int step)
  126. {
  127. RefCount_(RefCount_ConcurrencyLevel_Linear_Params(N, n, min, max, step));
  128. }
  129. /// <summary>
  130. /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions.
  131. /// While this is going on, one consumer does not want to be disturbed while receiving the sequence.
  132. /// Subscriptions are happening on the ThreadPool, possibly causing (expected) time gaps.
  133. ///
  134. /// Uses linear increments for the concurrency level.
  135. /// </summary>
  136. /// <param name="N">Number of messages.</param>
  137. /// <param name="n">Number of iterations.</param>
  138. /// <param name="min">Minimum level of concurrency.</param>
  139. /// <param name="max">Maximum level of concurrency.</param>
  140. /// <param name="step">Additive step size to increase level of concurrency.</param>
  141. public static void RefCountWithPost_ConcurrencyLevel_Linear(int N, int n, int min, int max, int step)
  142. {
  143. RefCountWithPost_(RefCount_ConcurrencyLevel_Linear_Params(N, n, min, max, step));
  144. }
  145. private static IEnumerable<Tuple<int, int>> RefCount_ConcurrencyLevel_Linear_Params(int N, int n, int min, int max, int step)
  146. {
  147. for (int i = 0; i < n; i++)
  148. {
  149. for (int M = min; M <= max; M += step)
  150. {
  151. yield return new Tuple<int, int>(N, M);
  152. }
  153. }
  154. }
  155. /// <summary>
  156. /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions.
  157. /// While this is going on, one consumer does not want to be disturbed while receiving the sequence.
  158. ///
  159. /// Uses exponential increments for the concurrency level.
  160. /// </summary>
  161. /// <param name="N">Number of messages.</param>
  162. /// <param name="n">Number of iterations.</param>
  163. /// <param name="min">Minimum level of concurrency.</param>
  164. /// <param name="max">Maximum level of concurrency.</param>
  165. /// <param name="step">Multiplicative step size to increase level of concurrency.</param>
  166. public static void RefCount_ConcurrencyLevel_Exponential(int N, int n, int min, int max, int step)
  167. {
  168. RefCount_(RefCount_ConcurrencyLevel_Exponential_Params(N, n, min, max, step));
  169. }
  170. /// <summary>
  171. /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions.
  172. /// While this is going on, one consumer does not want to be disturbed while receiving the sequence.
  173. /// Subscriptions are happening on the ThreadPool, possibly causing (expected) time gaps.
  174. ///
  175. /// Uses exponential increments for the concurrency level.
  176. /// </summary>
  177. /// <param name="N">Number of messages.</param>
  178. /// <param name="n">Number of iterations.</param>
  179. /// <param name="min">Minimum level of concurrency.</param>
  180. /// <param name="max">Maximum level of concurrency.</param>
  181. /// <param name="step">Multiplicative step size to increase level of concurrency.</param>
  182. public static void RefCountWithPost_ConcurrencyLevel_Exponential(int N, int n, int min, int max, int step)
  183. {
  184. RefCountWithPost_(RefCount_ConcurrencyLevel_Exponential_Params(N, n, min, max, step));
  185. }
  186. private static IEnumerable<Tuple<int, int>> RefCount_ConcurrencyLevel_Exponential_Params(int N, int n, int min, int max, int step)
  187. {
  188. for (int i = 0; i < n; i++)
  189. {
  190. for (int M = min; M <= max; M *= step)
  191. {
  192. yield return new Tuple<int, int>(N, M);
  193. }
  194. }
  195. }
  196. private static void RefCount_(IEnumerable<Tuple<int, int>> parameters)
  197. {
  198. foreach (var p in parameters)
  199. {
  200. var N = p.Item1;
  201. var M = p.Item2;
  202. Console.Write("N = {0}, M = {1} - ", N, M);
  203. var bar = new Bar();
  204. var foo = Observable.FromEventPattern<FooEventArgs>(h => { Console.Write("+"); bar.Foo += h; }, h => { bar.Foo -= h; Console.Write("-"); });
  205. var res = new List<int>();
  206. var n = 0;
  207. var e = new ManualResetEvent(false);
  208. var cd = new CountdownEvent(M * 2);
  209. for (int i = 0; i < M; i++)
  210. {
  211. var f = new SingleAssignmentDisposable();
  212. ThreadPool.QueueUserWorkItem(_ =>
  213. {
  214. f.Disposable = foo.Subscribe(__ => { Console.Write("!"); });
  215. cd.Signal();
  216. });
  217. ThreadPool.QueueUserWorkItem(_ =>
  218. {
  219. f.Dispose();
  220. cd.Signal();
  221. });
  222. }
  223. Console.Write("{SB}");
  224. var d = foo.Subscribe(x =>
  225. {
  226. //Console.Write("&");
  227. if (++n == N)
  228. e.Set();
  229. res.Add(x.EventArgs.Qux);
  230. });
  231. Console.Write("{SE}");
  232. var t = new Thread(() =>
  233. {
  234. Console.Write("{TB}");
  235. for (int i = 0; i < N; i++)
  236. bar.OnFoo(i);
  237. Console.Write("{TE}");
  238. });
  239. t.Start();
  240. t.Join();
  241. cd.Wait();
  242. e.WaitOne();
  243. d.Dispose();
  244. if (!res.SequenceEqual(Enumerable.Range(0, N)))
  245. {
  246. Console.WriteLine("Panic!");
  247. break;
  248. }
  249. Console.WriteLine(".");
  250. }
  251. }
  252. private static void RefCountWithPost_(IEnumerable<Tuple<int, int>> parameters)
  253. {
  254. var worker = new Thread(() =>
  255. {
  256. SynchronizationContext.SetSynchronizationContext(new MySyncCtx());
  257. foreach (var p in parameters)
  258. {
  259. var N = p.Item1;
  260. var M = p.Item2;
  261. Console.Write("N = {0}, M = {1} - ", N, M);
  262. var bar = new Bar();
  263. var foo = Observable.FromEventPattern<FooEventArgs>(h => { /*Console.Write("+");*/ bar.Foo += h; }, h => { bar.Foo -= h; /*Console.Write("-"); */});
  264. var e = new ManualResetEvent(false);
  265. var cd = new CountdownEvent(M * 2);
  266. for (int i = 0; i < M; i++)
  267. {
  268. var f = new SingleAssignmentDisposable();
  269. ThreadPool.QueueUserWorkItem(_ =>
  270. {
  271. f.Disposable = foo.Subscribe(__ => { /*Console.Write("!");*/ });
  272. cd.Signal();
  273. });
  274. ThreadPool.QueueUserWorkItem(_ =>
  275. {
  276. f.Dispose();
  277. cd.Signal();
  278. });
  279. }
  280. var hasObserved = 0;
  281. Console.Write("{SB}");
  282. var d = foo.Subscribe(x =>
  283. {
  284. //
  285. // [on BARTDE-M6500 with CPU and RAM pressure]
  286. //
  287. // Up to 8K concurrent observers, we typically don't see a time gap (expected worst-case behavior).
  288. // The code below uses an event to check the desired behavior of eventually tuning in to the event stream.
  289. //
  290. Console.Write("&" + x.EventArgs.Qux);
  291. e.Set();
  292. Interlocked.Exchange(ref hasObserved, 1);
  293. });
  294. Console.Write("{SE}");
  295. var t = new Thread(() =>
  296. {
  297. Console.Write("{TB}");
  298. var i = 0;
  299. while (Thread.VolatileRead(ref hasObserved) == 0)
  300. bar.OnFoo(i++);
  301. Console.Write("{TE}");
  302. });
  303. t.Start();
  304. t.Join();
  305. cd.Wait();
  306. e.WaitOne();
  307. d.Dispose();
  308. Console.WriteLine(".");
  309. }
  310. });
  311. worker.Start();
  312. worker.Join();
  313. }
  314. class Bar
  315. {
  316. public event EventHandler<FooEventArgs> Foo;
  317. public void OnFoo(int x)
  318. {
  319. var foo = Foo;
  320. if (foo != null)
  321. foo(this, new FooEventArgs { Qux = x });
  322. }
  323. }
  324. class FooEventArgs : EventArgs
  325. {
  326. public int Qux { get; set; }
  327. }
  328. class MySyncCtx : SynchronizationContext
  329. {
  330. public override void Post(SendOrPostCallback d, object state)
  331. {
  332. ThreadPool.QueueUserWorkItem(_ =>
  333. {
  334. d(state);
  335. });
  336. }
  337. }
  338. }
  339. }
  340. #endif