RegressionTest.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using System.Threading;
  12. using Microsoft.Reactive.Testing;
  13. using Microsoft.VisualStudio.TestTools.UnitTesting;
  14. using Assert = Xunit.Assert;
  15. namespace ReactiveTests.Tests
  16. {
  17. [TestClass]
  18. public class RegressionTest : ReactiveTest
  19. {
  20. [TestMethod]
  21. public void Bug_ConcurrentMerge()
  22. {
  23. const int reps = 1000;
  24. var source = Enumerable.Range(0, reps).ToObservable();
  25. var resultQueue = new System.Collections.Concurrent.ConcurrentQueue<int>();
  26. var r = new Random();
  27. source.Select(i => Observable.Create<Unit>(o =>
  28. {
  29. resultQueue.Enqueue(i);
  30. System.Threading.Tasks.Task.Factory.StartNew(
  31. () =>
  32. {
  33. Thread.Sleep(r.Next(10));
  34. o.OnCompleted();
  35. });
  36. return () => { };
  37. })).Merge(3).ForEach(_ => { });
  38. Assert.True(Enumerable.Range(0, reps).ToList().SequenceEqual(resultQueue.ToList()));
  39. }
  40. [TestMethod]
  41. public void Bug_1283()
  42. {
  43. var scheduler = new TestScheduler();
  44. var xs = scheduler.CreateHotObservable(
  45. OnNext(100, 1),
  46. OnNext(220, 2),
  47. OnNext(240, 3),
  48. OnNext(300, 4),
  49. OnNext(310, 5),
  50. OnCompleted<int>(350)
  51. );
  52. var results = scheduler.Start(() =>
  53. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  54. );
  55. results.Messages.AssertEqual(
  56. OnNext(220, "0 2"),
  57. OnNext(240, "0 3"),
  58. OnNext(300, "0 4"),
  59. OnNext(301, "0 end"),
  60. OnNext(310, "1 5"),
  61. OnNext(351, "1 end"),
  62. OnCompleted<string>(351)
  63. );
  64. }
  65. [TestMethod]
  66. public void Bug_1261()
  67. {
  68. var scheduler = new TestScheduler();
  69. var xs = scheduler.CreateHotObservable(
  70. OnNext(205, 1),
  71. OnNext(210, 2),
  72. OnNext(215, 3),
  73. OnNext(220, 4),
  74. OnNext(225, 5),
  75. OnNext(230, 6),
  76. OnCompleted<int>(230));
  77. var results = scheduler.Start(() =>
  78. xs.Window(TimeSpan.FromTicks(10), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  79. );
  80. results.Messages.AssertEqual(
  81. OnNext(205, "0 1"),
  82. OnNext(210, "0 2"),
  83. OnNext(211, "0 end"),
  84. OnNext(215, "1 3"),
  85. OnNext(220, "1 4"),
  86. OnNext(221, "1 end"),
  87. OnNext(225, "2 5"),
  88. OnNext(230, "2 6"),
  89. OnNext(231, "2 end"),
  90. OnCompleted<string>(231)
  91. );
  92. }
  93. [TestMethod]
  94. public void Bug_1130()
  95. {
  96. var xs = Observable.Start(() => 5);
  97. Assert.Null(xs as ISubject<int, int>);
  98. }
  99. #if !NO_THREAD
  100. [TestMethod]
  101. public void Bug_1286()
  102. {
  103. var infinite = Observable.Return(new { Name = "test", Value = 0d }, DefaultScheduler.Instance).Repeat();
  104. var grouped = infinite.GroupBy(x => x.Name, x => x.Value);
  105. var disp = grouped.Subscribe(_ => { });
  106. Thread.Sleep(1);
  107. //most of the time, this deadlocks
  108. disp.Dispose();
  109. disp = grouped.Subscribe(_ => { });
  110. Thread.Sleep(1);
  111. //if the first doesn't this one always
  112. disp.Dispose();
  113. }
  114. #endif
  115. [TestMethod]
  116. public void Bug_1287()
  117. {
  118. var flag = false;
  119. var x = Observable.Return(1, Scheduler.CurrentThread).Concat(Observable.Never<int>()).Finally(() => flag = true).First();
  120. Assert.Equal(1, x);
  121. Assert.True(flag);
  122. }
  123. #if !NO_THREAD
  124. private static IEnumerable<int> Bug_1333_Enumerable(AsyncSubject<IDisposable> s, Semaphore sema)
  125. {
  126. var d = s.First();
  127. var t = new Thread(() => { d.Dispose(); sema.Release(); });
  128. t.Start();
  129. t.Join();
  130. yield return 1;
  131. }
  132. [TestMethod]
  133. //[Timeout(1000)]
  134. public void Bug_1333()
  135. {
  136. var sema = new Semaphore(0, 1);
  137. var d = new AsyncSubject<IDisposable>();
  138. var e = Bug_1333_Enumerable(d, sema).ToObservable(DefaultScheduler.Instance).Subscribe();
  139. d.OnNext(e);
  140. d.OnCompleted();
  141. sema.WaitOne();
  142. }
  143. #endif
  144. [TestMethod]
  145. public void Bug_1295_Completed()
  146. {
  147. var scheduler = new TestScheduler();
  148. var xs = scheduler.CreateHotObservable(
  149. OnNext(300, 1),
  150. OnNext(350, 2),
  151. OnNext(500, 3),
  152. OnCompleted<int>(550)
  153. );
  154. var results = scheduler.Start(() =>
  155. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  156. );
  157. results.Messages.AssertEqual(
  158. OnNext(450, 2),
  159. OnNext(550, 3),
  160. OnCompleted<int>(550)
  161. );
  162. xs.Subscriptions.AssertEqual(
  163. Subscribe(200, 550)
  164. );
  165. }
  166. [TestMethod]
  167. public void Bug_1295_Error()
  168. {
  169. var scheduler = new TestScheduler();
  170. var ex = new Exception();
  171. var xs = scheduler.CreateHotObservable(
  172. OnNext(300, 1),
  173. OnNext(350, 2),
  174. OnNext(500, 3),
  175. OnError<int>(550, ex)
  176. );
  177. var results = scheduler.Start(() =>
  178. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  179. );
  180. results.Messages.AssertEqual(
  181. OnNext(450, 2),
  182. OnError<int>(550, ex)
  183. );
  184. xs.Subscriptions.AssertEqual(
  185. Subscribe(200, 550)
  186. );
  187. }
  188. [TestMethod]
  189. public void Bug_1297_Catch_None()
  190. {
  191. var scheduler = new TestScheduler();
  192. var results = scheduler.Start(() =>
  193. Observable.Catch<int>()
  194. );
  195. results.Messages.AssertEqual(
  196. OnCompleted<int>(200)
  197. );
  198. }
  199. [TestMethod]
  200. public void Bug_1297_OnErrorResumeNext_None()
  201. {
  202. var scheduler = new TestScheduler();
  203. var results = scheduler.Start(() =>
  204. Observable.OnErrorResumeNext<int>()
  205. );
  206. results.Messages.AssertEqual(
  207. OnCompleted<int>(200)
  208. );
  209. }
  210. [TestMethod]
  211. public void Bug_1297_Catch_Single()
  212. {
  213. var scheduler = new TestScheduler();
  214. var ex = new Exception();
  215. var xs = Observable.Throw<int>(ex, scheduler);
  216. var results = scheduler.Start(() =>
  217. Observable.Catch(xs)
  218. );
  219. results.Messages.AssertEqual(
  220. OnError<int>(201, ex)
  221. );
  222. }
  223. [TestMethod]
  224. public void Bug_1297_OnErrorResumeNext_Single()
  225. {
  226. var scheduler = new TestScheduler();
  227. var xs = Observable.Throw<int>(new Exception(), scheduler);
  228. var results = scheduler.Start(() =>
  229. Observable.OnErrorResumeNext(xs)
  230. );
  231. results.Messages.AssertEqual(
  232. OnCompleted<int>(201)
  233. );
  234. }
  235. [TestMethod]
  236. public void Bug_1297_Catch_Multi()
  237. {
  238. var scheduler = new TestScheduler();
  239. var ex1 = new Exception();
  240. var ex2 = new Exception();
  241. var ex3 = new Exception();
  242. var xs = Observable.Throw<int>(ex1, scheduler);
  243. var ys = Observable.Throw<int>(ex2, scheduler);
  244. var zs = Observable.Throw<int>(ex3, scheduler);
  245. var results = scheduler.Start(() =>
  246. Observable.Catch(xs, ys, zs)
  247. );
  248. results.Messages.AssertEqual(
  249. OnError<int>(203, ex3)
  250. );
  251. }
  252. [TestMethod]
  253. public void Bug_1297_OnErrorResumeNext_Multi()
  254. {
  255. var scheduler = new TestScheduler();
  256. var ex1 = new Exception();
  257. var ex2 = new Exception();
  258. var ex3 = new Exception();
  259. var xs = Observable.Throw<int>(ex1, scheduler);
  260. var ys = Observable.Throw<int>(ex2, scheduler);
  261. var zs = Observable.Throw<int>(ex3, scheduler);
  262. var results = scheduler.Start(() =>
  263. Observable.OnErrorResumeNext(xs, ys, zs)
  264. );
  265. results.Messages.AssertEqual(
  266. OnCompleted<int>(203)
  267. );
  268. }
  269. [TestMethod]
  270. public void Bug_1380()
  271. {
  272. var scheduler = new TestScheduler();
  273. var ex = new Exception();
  274. var xs = scheduler.CreateHotObservable(
  275. OnNext(220, 1),
  276. OnNext(250, 2),
  277. OnNext(270, 3),
  278. OnNext(290, 4),
  279. OnNext(310, 5),
  280. OnNext(340, 6),
  281. OnNext(360, 7),
  282. OnError<int>(380, ex)
  283. );
  284. var results = scheduler.Start(() =>
  285. xs.Delay(TimeSpan.FromTicks(100), scheduler)
  286. );
  287. results.Messages.AssertEqual(
  288. OnNext(320, 1),
  289. OnNext(350, 2),
  290. OnNext(370, 3),
  291. OnError<int>(380, ex)
  292. );
  293. xs.Subscriptions.AssertEqual(
  294. Subscribe(200, 380)
  295. );
  296. }
  297. [TestMethod]
  298. public void Bug_1356()
  299. {
  300. var run = false;
  301. Observable.Range(0, 10).Finally(() => run = true).Take(5).ForEach(_ => { });
  302. Assert.True(run);
  303. }
  304. [TestMethod]
  305. public void Bug_1381()
  306. {
  307. var scheduler = new TestScheduler();
  308. var xs = scheduler.CreateHotObservable(
  309. OnNext(90, 1),
  310. OnNext(110, 2),
  311. OnNext(250, 3),
  312. OnNext(270, 4),
  313. OnNext(280, 5),
  314. OnNext(301, 6),
  315. OnNext(302, 7),
  316. OnNext(400, 8),
  317. OnNext(401, 9),
  318. OnNext(510, 10)
  319. );
  320. var results = scheduler.CreateObserver<int>();
  321. var ys = default(IConnectableObservable<int>);
  322. var connection = default(IDisposable);
  323. var subscription = default(IDisposable);
  324. scheduler.ScheduleAbsolute(100, () => ys = xs.Multicast(new ReplaySubject<int>(scheduler)));
  325. scheduler.ScheduleAbsolute(200, () => connection = ys.Connect());
  326. scheduler.ScheduleAbsolute(300, () => subscription = ys.Subscribe(results));
  327. scheduler.ScheduleAbsolute(500, () => subscription.Dispose());
  328. scheduler.ScheduleAbsolute(600, () => connection.Dispose());
  329. scheduler.Start();
  330. results.Messages.AssertEqual(
  331. OnNext(301, 3),
  332. OnNext(302, 4),
  333. OnNext(303, 5),
  334. OnNext(304, 6),
  335. OnNext(305, 7),
  336. OnNext(401, 8),
  337. OnNext(402, 9)
  338. );
  339. xs.Subscriptions.AssertEqual(
  340. Subscribe(200, 600)
  341. );
  342. }
  343. [TestMethod]
  344. public void Reentrant_Subject1()
  345. {
  346. var s = Subject.Synchronize((ISubject<int, int>)new Subject<int>(), Scheduler.Immediate);
  347. var list = new List<int>();
  348. s.Subscribe(
  349. x =>
  350. {
  351. list.Add(x);
  352. if (x < 3)
  353. {
  354. s.OnNext(x + 1);
  355. }
  356. list.Add(-x);
  357. });
  358. s.OnNext(1);
  359. list.AssertEqual(1, -1, 2, -2, 3, -3);
  360. }
  361. [TestMethod]
  362. public void Reentrant_Subject2()
  363. {
  364. var s = Subject.Synchronize(new Subject<int>(), Scheduler.Immediate);
  365. var list = new List<int>();
  366. s.Subscribe(
  367. x =>
  368. {
  369. list.Add(x);
  370. if (x < 3)
  371. {
  372. s.OnNext(x + 1);
  373. }
  374. list.Add(-x);
  375. });
  376. s.OnNext(1);
  377. list.AssertEqual(1, -1, 2, -2, 3, -3);
  378. }
  379. [TestMethod]
  380. public void Merge_Trampoline1()
  381. {
  382. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => xs.Merge(xs));
  383. var list = new List<int>();
  384. ys.Subscribe(list.Add);
  385. list.AssertEqual(1, 1, 2, 2, 3, 3);
  386. }
  387. [TestMethod]
  388. public void Merge_Trampoline2()
  389. {
  390. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => Observable.Merge(xs, xs, xs, xs));
  391. var list = new List<int>();
  392. ys.Subscribe(list.Add);
  393. list.AssertEqual(1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3);
  394. }
  395. }
  396. }