RegressionTest.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using System.Threading;
  12. using Microsoft.Reactive.Testing;
  13. using Microsoft.VisualStudio.TestTools.UnitTesting;
  14. using Assert = Xunit.Assert;
  15. namespace ReactiveTests.Tests
  16. {
  17. [TestClass]
  18. public class RegressionTest : ReactiveTest
  19. {
  20. [TestMethod]
  21. public void Bug_ConcurrentMerge()
  22. {
  23. const int reps = 1000;
  24. var source = Enumerable.Range(0, reps).ToObservable();
  25. var resultQueue = new System.Collections.Concurrent.ConcurrentQueue<int>();
  26. var r = new Random();
  27. source.Select(i => Observable.Create<Unit>(o =>
  28. {
  29. resultQueue.Enqueue(i);
  30. System.Threading.Tasks.Task.Factory.StartNew(
  31. () =>
  32. {
  33. Thread.Sleep(r.Next(10));
  34. o.OnCompleted();
  35. });
  36. return () => { };
  37. })).Merge(3).ForEach(_ => { });
  38. Assert.True(Enumerable.Range(0, reps).ToList().SequenceEqual(resultQueue.ToList()));
  39. }
  40. [TestMethod]
  41. public void Bug_1283()
  42. {
  43. var scheduler = new TestScheduler();
  44. var xs = scheduler.CreateHotObservable(
  45. OnNext(100, 1),
  46. OnNext(220, 2),
  47. OnNext(240, 3),
  48. OnNext(300, 4),
  49. OnNext(310, 5),
  50. OnCompleted<int>(350)
  51. );
  52. var results = scheduler.Start(() =>
  53. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  54. );
  55. results.Messages.AssertEqual(
  56. OnNext(220, "0 2"),
  57. OnNext(240, "0 3"),
  58. OnNext(300, "0 4"),
  59. OnNext(301, "0 end"),
  60. OnNext(310, "1 5"),
  61. OnNext(351, "1 end"),
  62. OnCompleted<string>(351)
  63. );
  64. }
  65. [TestMethod]
  66. public void Bug_1261()
  67. {
  68. var scheduler = new TestScheduler();
  69. var xs = scheduler.CreateHotObservable(
  70. OnNext(205, 1),
  71. OnNext(210, 2),
  72. OnNext(215, 3),
  73. OnNext(220, 4),
  74. OnNext(225, 5),
  75. OnNext(230, 6),
  76. OnCompleted<int>(230));
  77. var results = scheduler.Start(() =>
  78. xs.Window(TimeSpan.FromTicks(10), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  79. );
  80. results.Messages.AssertEqual(
  81. OnNext(205, "0 1"),
  82. OnNext(210, "0 2"),
  83. OnNext(211, "0 end"),
  84. OnNext(215, "1 3"),
  85. OnNext(220, "1 4"),
  86. OnNext(221, "1 end"),
  87. OnNext(225, "2 5"),
  88. OnNext(230, "2 6"),
  89. OnNext(231, "2 end"),
  90. OnCompleted<string>(231)
  91. );
  92. }
  93. [TestMethod]
  94. public void Bug_1130()
  95. {
  96. var xs = Observable.Start(() => 5);
  97. Assert.Null(xs as ISubject<int, int>);
  98. }
  99. [TestMethod]
  100. public void Bug_1286()
  101. {
  102. var infinite = Observable.Return(new { Name = "test", Value = 0d }, DefaultScheduler.Instance).Repeat();
  103. var grouped = infinite.GroupBy(x => x.Name, x => x.Value);
  104. var disp = grouped.Subscribe(_ => { });
  105. Thread.Sleep(1);
  106. //most of the time, this deadlocks
  107. disp.Dispose();
  108. disp = grouped.Subscribe(_ => { });
  109. Thread.Sleep(1);
  110. //if the first doesn't this one always
  111. disp.Dispose();
  112. }
  113. [TestMethod]
  114. public void Bug_1287()
  115. {
  116. var flag = false;
  117. var x = Observable.Return(1, Scheduler.CurrentThread).Concat(Observable.Never<int>()).Finally(() => flag = true).First();
  118. Assert.Equal(1, x);
  119. Assert.True(flag);
  120. }
  121. private static IEnumerable<int> Bug_1333_Enumerable(AsyncSubject<IDisposable> s, Semaphore sema)
  122. {
  123. var d = s.First();
  124. var t = new Thread(() => { d.Dispose(); sema.Release(); });
  125. t.Start();
  126. t.Join();
  127. yield return 1;
  128. }
  129. [TestMethod]
  130. //[Timeout(1000)]
  131. public void Bug_1333()
  132. {
  133. var sema = new Semaphore(0, 1);
  134. var d = new AsyncSubject<IDisposable>();
  135. var e = Bug_1333_Enumerable(d, sema).ToObservable(DefaultScheduler.Instance).Subscribe();
  136. d.OnNext(e);
  137. d.OnCompleted();
  138. sema.WaitOne();
  139. }
  140. [TestMethod]
  141. public void Bug_1295_Completed()
  142. {
  143. var scheduler = new TestScheduler();
  144. var xs = scheduler.CreateHotObservable(
  145. OnNext(300, 1),
  146. OnNext(350, 2),
  147. OnNext(500, 3),
  148. OnCompleted<int>(550)
  149. );
  150. var results = scheduler.Start(() =>
  151. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  152. );
  153. results.Messages.AssertEqual(
  154. OnNext(450, 2),
  155. OnNext(550, 3),
  156. OnCompleted<int>(550)
  157. );
  158. xs.Subscriptions.AssertEqual(
  159. Subscribe(200, 550)
  160. );
  161. }
  162. [TestMethod]
  163. public void Bug_1295_Error()
  164. {
  165. var scheduler = new TestScheduler();
  166. var ex = new Exception();
  167. var xs = scheduler.CreateHotObservable(
  168. OnNext(300, 1),
  169. OnNext(350, 2),
  170. OnNext(500, 3),
  171. OnError<int>(550, ex)
  172. );
  173. var results = scheduler.Start(() =>
  174. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  175. );
  176. results.Messages.AssertEqual(
  177. OnNext(450, 2),
  178. OnError<int>(550, ex)
  179. );
  180. xs.Subscriptions.AssertEqual(
  181. Subscribe(200, 550)
  182. );
  183. }
  184. [TestMethod]
  185. public void Bug_1297_Catch_None()
  186. {
  187. var scheduler = new TestScheduler();
  188. var results = scheduler.Start(() =>
  189. Observable.Catch<int>()
  190. );
  191. results.Messages.AssertEqual(
  192. OnCompleted<int>(200)
  193. );
  194. }
  195. [TestMethod]
  196. public void Bug_1297_OnErrorResumeNext_None()
  197. {
  198. var scheduler = new TestScheduler();
  199. var results = scheduler.Start(() =>
  200. Observable.OnErrorResumeNext<int>()
  201. );
  202. results.Messages.AssertEqual(
  203. OnCompleted<int>(200)
  204. );
  205. }
  206. [TestMethod]
  207. public void Bug_1297_Catch_Single()
  208. {
  209. var scheduler = new TestScheduler();
  210. var ex = new Exception();
  211. var xs = Observable.Throw<int>(ex, scheduler);
  212. var results = scheduler.Start(() =>
  213. Observable.Catch(xs)
  214. );
  215. results.Messages.AssertEqual(
  216. OnError<int>(201, ex)
  217. );
  218. }
  219. [TestMethod]
  220. public void Bug_1297_OnErrorResumeNext_Single()
  221. {
  222. var scheduler = new TestScheduler();
  223. var xs = Observable.Throw<int>(new Exception(), scheduler);
  224. var results = scheduler.Start(() =>
  225. Observable.OnErrorResumeNext(xs)
  226. );
  227. results.Messages.AssertEqual(
  228. OnCompleted<int>(201)
  229. );
  230. }
  231. [TestMethod]
  232. public void Bug_1297_Catch_Multi()
  233. {
  234. var scheduler = new TestScheduler();
  235. var ex1 = new Exception();
  236. var ex2 = new Exception();
  237. var ex3 = new Exception();
  238. var xs = Observable.Throw<int>(ex1, scheduler);
  239. var ys = Observable.Throw<int>(ex2, scheduler);
  240. var zs = Observable.Throw<int>(ex3, scheduler);
  241. var results = scheduler.Start(() =>
  242. Observable.Catch(xs, ys, zs)
  243. );
  244. results.Messages.AssertEqual(
  245. OnError<int>(203, ex3)
  246. );
  247. }
  248. [TestMethod]
  249. public void Bug_1297_OnErrorResumeNext_Multi()
  250. {
  251. var scheduler = new TestScheduler();
  252. var ex1 = new Exception();
  253. var ex2 = new Exception();
  254. var ex3 = new Exception();
  255. var xs = Observable.Throw<int>(ex1, scheduler);
  256. var ys = Observable.Throw<int>(ex2, scheduler);
  257. var zs = Observable.Throw<int>(ex3, scheduler);
  258. var results = scheduler.Start(() =>
  259. Observable.OnErrorResumeNext(xs, ys, zs)
  260. );
  261. results.Messages.AssertEqual(
  262. OnCompleted<int>(203)
  263. );
  264. }
  265. [TestMethod]
  266. public void Bug_1380()
  267. {
  268. var scheduler = new TestScheduler();
  269. var ex = new Exception();
  270. var xs = scheduler.CreateHotObservable(
  271. OnNext(220, 1),
  272. OnNext(250, 2),
  273. OnNext(270, 3),
  274. OnNext(290, 4),
  275. OnNext(310, 5),
  276. OnNext(340, 6),
  277. OnNext(360, 7),
  278. OnError<int>(380, ex)
  279. );
  280. var results = scheduler.Start(() =>
  281. xs.Delay(TimeSpan.FromTicks(100), scheduler)
  282. );
  283. results.Messages.AssertEqual(
  284. OnNext(320, 1),
  285. OnNext(350, 2),
  286. OnNext(370, 3),
  287. OnError<int>(380, ex)
  288. );
  289. xs.Subscriptions.AssertEqual(
  290. Subscribe(200, 380)
  291. );
  292. }
  293. [TestMethod]
  294. public void Bug_1356()
  295. {
  296. var run = false;
  297. Observable.Range(0, 10).Finally(() => run = true).Take(5).ForEach(_ => { });
  298. Assert.True(run);
  299. }
  300. [TestMethod]
  301. public void Bug_1381()
  302. {
  303. var scheduler = new TestScheduler();
  304. var xs = scheduler.CreateHotObservable(
  305. OnNext(90, 1),
  306. OnNext(110, 2),
  307. OnNext(250, 3),
  308. OnNext(270, 4),
  309. OnNext(280, 5),
  310. OnNext(301, 6),
  311. OnNext(302, 7),
  312. OnNext(400, 8),
  313. OnNext(401, 9),
  314. OnNext(510, 10)
  315. );
  316. var results = scheduler.CreateObserver<int>();
  317. var ys = default(IConnectableObservable<int>);
  318. var connection = default(IDisposable);
  319. var subscription = default(IDisposable);
  320. scheduler.ScheduleAbsolute(100, () => ys = xs.Multicast(new ReplaySubject<int>(scheduler)));
  321. scheduler.ScheduleAbsolute(200, () => connection = ys.Connect());
  322. scheduler.ScheduleAbsolute(300, () => subscription = ys.Subscribe(results));
  323. scheduler.ScheduleAbsolute(500, () => subscription.Dispose());
  324. scheduler.ScheduleAbsolute(600, () => connection.Dispose());
  325. scheduler.Start();
  326. results.Messages.AssertEqual(
  327. OnNext(301, 3),
  328. OnNext(302, 4),
  329. OnNext(303, 5),
  330. OnNext(304, 6),
  331. OnNext(305, 7),
  332. OnNext(401, 8),
  333. OnNext(402, 9)
  334. );
  335. xs.Subscriptions.AssertEqual(
  336. Subscribe(200, 600)
  337. );
  338. }
  339. [TestMethod]
  340. public void Reentrant_Subject1()
  341. {
  342. var s = Subject.Synchronize((ISubject<int, int>)new Subject<int>(), Scheduler.Immediate);
  343. var list = new List<int>();
  344. s.Subscribe(
  345. x =>
  346. {
  347. list.Add(x);
  348. if (x < 3)
  349. {
  350. s.OnNext(x + 1);
  351. }
  352. list.Add(-x);
  353. });
  354. s.OnNext(1);
  355. list.AssertEqual(1, -1, 2, -2, 3, -3);
  356. }
  357. [TestMethod]
  358. public void Reentrant_Subject2()
  359. {
  360. var s = Subject.Synchronize(new Subject<int>(), Scheduler.Immediate);
  361. var list = new List<int>();
  362. s.Subscribe(
  363. x =>
  364. {
  365. list.Add(x);
  366. if (x < 3)
  367. {
  368. s.OnNext(x + 1);
  369. }
  370. list.Add(-x);
  371. });
  372. s.OnNext(1);
  373. list.AssertEqual(1, -1, 2, -2, 3, -3);
  374. }
  375. [TestMethod]
  376. public void Merge_Trampoline1()
  377. {
  378. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => xs.Merge(xs));
  379. var list = new List<int>();
  380. ys.Subscribe(list.Add);
  381. list.AssertEqual(1, 1, 2, 2, 3, 3);
  382. }
  383. [TestMethod]
  384. public void Merge_Trampoline2()
  385. {
  386. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => Observable.Merge(xs, xs, xs, xs));
  387. var list = new List<int>();
  388. ys.Subscribe(list.Add);
  389. list.AssertEqual(1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3);
  390. }
  391. }
  392. }