RegressionTest.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using System.Threading;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. namespace ReactiveTests.Tests
  15. {
  16. public class RegressionTest : ReactiveTest
  17. {
  18. [Fact]
  19. public void Bug_ConcurrentMerge()
  20. {
  21. const int reps = 1000;
  22. var source = Enumerable.Range(0, reps).ToObservable();
  23. var resultQueue = new System.Collections.Concurrent.ConcurrentQueue<int>();
  24. var r = new Random();
  25. source.Select(i => Observable.Create<Unit>(o =>
  26. {
  27. resultQueue.Enqueue(i);
  28. System.Threading.Tasks.Task.Factory.StartNew(
  29. () =>
  30. {
  31. Thread.Sleep(r.Next(10));
  32. o.OnCompleted();
  33. });
  34. return () => { };
  35. })).Merge(3).ForEach(_ => { });
  36. Assert.True(Enumerable.Range(0, reps).ToList().SequenceEqual(resultQueue.ToList()));
  37. }
  38. [Fact]
  39. public void Bug_1283()
  40. {
  41. var scheduler = new TestScheduler();
  42. var xs = scheduler.CreateHotObservable(
  43. OnNext(100, 1),
  44. OnNext(220, 2),
  45. OnNext(240, 3),
  46. OnNext(300, 4),
  47. OnNext(310, 5),
  48. OnCompleted<int>(350)
  49. );
  50. var results = scheduler.Start(() =>
  51. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  52. );
  53. results.Messages.AssertEqual(
  54. OnNext(220, "0 2"),
  55. OnNext(240, "0 3"),
  56. OnNext(300, "0 4"),
  57. OnNext(301, "0 end"),
  58. OnNext(310, "1 5"),
  59. OnNext(351, "1 end"),
  60. OnCompleted<string>(351)
  61. );
  62. }
  63. [Fact]
  64. public void Bug_1261()
  65. {
  66. var scheduler = new TestScheduler();
  67. var xs = scheduler.CreateHotObservable(
  68. OnNext(205, 1),
  69. OnNext(210, 2),
  70. OnNext(215, 3),
  71. OnNext(220, 4),
  72. OnNext(225, 5),
  73. OnNext(230, 6),
  74. OnCompleted<int>(230));
  75. var results = scheduler.Start(() =>
  76. xs.Window(TimeSpan.FromTicks(10), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  77. );
  78. results.Messages.AssertEqual(
  79. OnNext(205, "0 1"),
  80. OnNext(210, "0 2"),
  81. OnNext(211, "0 end"),
  82. OnNext(215, "1 3"),
  83. OnNext(220, "1 4"),
  84. OnNext(221, "1 end"),
  85. OnNext(225, "2 5"),
  86. OnNext(230, "2 6"),
  87. OnNext(231, "2 end"),
  88. OnCompleted<string>(231)
  89. );
  90. }
  91. [Fact]
  92. public void Bug_1130()
  93. {
  94. var xs = Observable.Start(() => 5);
  95. Assert.Null(xs as ISubject<int, int>);
  96. }
  97. #if !NO_THREAD
  98. [Fact]
  99. public void Bug_1286()
  100. {
  101. var infinite = Observable.Return(new { Name = "test", Value = 0d }, DefaultScheduler.Instance).Repeat();
  102. var grouped = infinite.GroupBy(x => x.Name, x => x.Value);
  103. var disp = grouped.Subscribe(_ => { });
  104. Thread.Sleep(1);
  105. //most of the time, this deadlocks
  106. disp.Dispose();
  107. disp = grouped.Subscribe(_ => { });
  108. Thread.Sleep(1);
  109. //if the first doesn't this one always
  110. disp.Dispose();
  111. }
  112. #endif
  113. [Fact]
  114. public void Bug_1287()
  115. {
  116. var flag = false;
  117. var x = Observable.Return(1, Scheduler.CurrentThread).Concat(Observable.Never<int>()).Finally(() => flag = true).First();
  118. Assert.Equal(1, x);
  119. Assert.True(flag);
  120. }
  121. #if !NO_THREAD
  122. private static IEnumerable<int> Bug_1333_Enumerable(AsyncSubject<IDisposable> s, Semaphore sema)
  123. {
  124. var d = s.First();
  125. var t = new Thread(() => { d.Dispose(); sema.Release(); });
  126. t.Start();
  127. t.Join();
  128. yield return 1;
  129. }
  130. [Fact]
  131. //[Timeout(1000)]
  132. public void Bug_1333()
  133. {
  134. var sema = new Semaphore(0, 1);
  135. var d = new AsyncSubject<IDisposable>();
  136. var e = Bug_1333_Enumerable(d, sema).ToObservable(DefaultScheduler.Instance).Subscribe();
  137. d.OnNext(e);
  138. d.OnCompleted();
  139. sema.WaitOne();
  140. }
  141. #endif
  142. [Fact]
  143. public void Bug_1295_Completed()
  144. {
  145. var scheduler = new TestScheduler();
  146. var xs = scheduler.CreateHotObservable(
  147. OnNext(300, 1),
  148. OnNext(350, 2),
  149. OnNext(500, 3),
  150. OnCompleted<int>(550)
  151. );
  152. var results = scheduler.Start(() =>
  153. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  154. );
  155. results.Messages.AssertEqual(
  156. OnNext(450, 2),
  157. OnNext(550, 3),
  158. OnCompleted<int>(550)
  159. );
  160. xs.Subscriptions.AssertEqual(
  161. Subscribe(200, 550)
  162. );
  163. }
  164. [Fact]
  165. public void Bug_1295_Error()
  166. {
  167. var scheduler = new TestScheduler();
  168. var ex = new Exception();
  169. var xs = scheduler.CreateHotObservable(
  170. OnNext(300, 1),
  171. OnNext(350, 2),
  172. OnNext(500, 3),
  173. OnError<int>(550, ex)
  174. );
  175. var results = scheduler.Start(() =>
  176. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  177. );
  178. results.Messages.AssertEqual(
  179. OnNext(450, 2),
  180. OnError<int>(550, ex)
  181. );
  182. xs.Subscriptions.AssertEqual(
  183. Subscribe(200, 550)
  184. );
  185. }
  186. [Fact]
  187. public void Bug_1297_Catch_None()
  188. {
  189. var scheduler = new TestScheduler();
  190. var results = scheduler.Start(() =>
  191. Observable.Catch<int>()
  192. );
  193. results.Messages.AssertEqual(
  194. OnCompleted<int>(200)
  195. );
  196. }
  197. [Fact]
  198. public void Bug_1297_OnErrorResumeNext_None()
  199. {
  200. var scheduler = new TestScheduler();
  201. var results = scheduler.Start(() =>
  202. Observable.OnErrorResumeNext<int>()
  203. );
  204. results.Messages.AssertEqual(
  205. OnCompleted<int>(200)
  206. );
  207. }
  208. [Fact]
  209. public void Bug_1297_Catch_Single()
  210. {
  211. var scheduler = new TestScheduler();
  212. var ex = new Exception();
  213. var xs = Observable.Throw<int>(ex, scheduler);
  214. var results = scheduler.Start(() =>
  215. Observable.Catch(xs)
  216. );
  217. results.Messages.AssertEqual(
  218. OnError<int>(201, ex)
  219. );
  220. }
  221. [Fact]
  222. public void Bug_1297_OnErrorResumeNext_Single()
  223. {
  224. var scheduler = new TestScheduler();
  225. var xs = Observable.Throw<int>(new Exception(), scheduler);
  226. var results = scheduler.Start(() =>
  227. Observable.OnErrorResumeNext(xs)
  228. );
  229. results.Messages.AssertEqual(
  230. OnCompleted<int>(201)
  231. );
  232. }
  233. [Fact]
  234. public void Bug_1297_Catch_Multi()
  235. {
  236. var scheduler = new TestScheduler();
  237. var ex1 = new Exception();
  238. var ex2 = new Exception();
  239. var ex3 = new Exception();
  240. var xs = Observable.Throw<int>(ex1, scheduler);
  241. var ys = Observable.Throw<int>(ex2, scheduler);
  242. var zs = Observable.Throw<int>(ex3, scheduler);
  243. var results = scheduler.Start(() =>
  244. Observable.Catch(xs, ys, zs)
  245. );
  246. results.Messages.AssertEqual(
  247. OnError<int>(203, ex3)
  248. );
  249. }
  250. [Fact]
  251. public void Bug_1297_OnErrorResumeNext_Multi()
  252. {
  253. var scheduler = new TestScheduler();
  254. var ex1 = new Exception();
  255. var ex2 = new Exception();
  256. var ex3 = new Exception();
  257. var xs = Observable.Throw<int>(ex1, scheduler);
  258. var ys = Observable.Throw<int>(ex2, scheduler);
  259. var zs = Observable.Throw<int>(ex3, scheduler);
  260. var results = scheduler.Start(() =>
  261. Observable.OnErrorResumeNext(xs, ys, zs)
  262. );
  263. results.Messages.AssertEqual(
  264. OnCompleted<int>(203)
  265. );
  266. }
  267. [Fact]
  268. public void Bug_1380()
  269. {
  270. var scheduler = new TestScheduler();
  271. var ex = new Exception();
  272. var xs = scheduler.CreateHotObservable(
  273. OnNext(220, 1),
  274. OnNext(250, 2),
  275. OnNext(270, 3),
  276. OnNext(290, 4),
  277. OnNext(310, 5),
  278. OnNext(340, 6),
  279. OnNext(360, 7),
  280. OnError<int>(380, ex)
  281. );
  282. var results = scheduler.Start(() =>
  283. xs.Delay(TimeSpan.FromTicks(100), scheduler)
  284. );
  285. results.Messages.AssertEqual(
  286. OnNext(320, 1),
  287. OnNext(350, 2),
  288. OnNext(370, 3),
  289. OnError<int>(380, ex)
  290. );
  291. xs.Subscriptions.AssertEqual(
  292. Subscribe(200, 380)
  293. );
  294. }
  295. [Fact]
  296. public void Bug_1356()
  297. {
  298. var run = false;
  299. Observable.Range(0, 10).Finally(() => run = true).Take(5).ForEach(_ => { });
  300. Assert.True(run);
  301. }
  302. [Fact]
  303. public void Bug_1381()
  304. {
  305. var scheduler = new TestScheduler();
  306. var xs = scheduler.CreateHotObservable(
  307. OnNext(90, 1),
  308. OnNext(110, 2),
  309. OnNext(250, 3),
  310. OnNext(270, 4),
  311. OnNext(280, 5),
  312. OnNext(301, 6),
  313. OnNext(302, 7),
  314. OnNext(400, 8),
  315. OnNext(401, 9),
  316. OnNext(510, 10)
  317. );
  318. var results = scheduler.CreateObserver<int>();
  319. var ys = default(IConnectableObservable<int>);
  320. var connection = default(IDisposable);
  321. var subscription = default(IDisposable);
  322. scheduler.ScheduleAbsolute(100, () => ys = xs.Multicast(new ReplaySubject<int>(scheduler)));
  323. scheduler.ScheduleAbsolute(200, () => connection = ys.Connect());
  324. scheduler.ScheduleAbsolute(300, () => subscription = ys.Subscribe(results));
  325. scheduler.ScheduleAbsolute(500, () => subscription.Dispose());
  326. scheduler.ScheduleAbsolute(600, () => connection.Dispose());
  327. scheduler.Start();
  328. results.Messages.AssertEqual(
  329. OnNext(301, 3),
  330. OnNext(302, 4),
  331. OnNext(303, 5),
  332. OnNext(304, 6),
  333. OnNext(305, 7),
  334. OnNext(401, 8),
  335. OnNext(402, 9)
  336. );
  337. xs.Subscriptions.AssertEqual(
  338. Subscribe(200, 600)
  339. );
  340. }
  341. [Fact]
  342. public void Reentrant_Subject1()
  343. {
  344. var s = Subject.Synchronize((ISubject<int, int>)new Subject<int>(), Scheduler.Immediate);
  345. var list = new List<int>();
  346. s.Subscribe(
  347. x =>
  348. {
  349. list.Add(x);
  350. if (x < 3)
  351. {
  352. s.OnNext(x + 1);
  353. }
  354. list.Add(-x);
  355. });
  356. s.OnNext(1);
  357. list.AssertEqual(1, -1, 2, -2, 3, -3);
  358. }
  359. [Fact]
  360. public void Reentrant_Subject2()
  361. {
  362. var s = Subject.Synchronize(new Subject<int>(), Scheduler.Immediate);
  363. var list = new List<int>();
  364. s.Subscribe(
  365. x =>
  366. {
  367. list.Add(x);
  368. if (x < 3)
  369. {
  370. s.OnNext(x + 1);
  371. }
  372. list.Add(-x);
  373. });
  374. s.OnNext(1);
  375. list.AssertEqual(1, -1, 2, -2, 3, -3);
  376. }
  377. [Fact]
  378. public void Merge_Trampoline1()
  379. {
  380. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => xs.Merge(xs));
  381. var list = new List<int>();
  382. ys.Subscribe(list.Add);
  383. list.AssertEqual(1, 1, 2, 2, 3, 3);
  384. }
  385. [Fact]
  386. public void Merge_Trampoline2()
  387. {
  388. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => Observable.Merge(xs, xs, xs, xs));
  389. var list = new List<int>();
  390. ys.Subscribe(list.Add);
  391. list.AssertEqual(1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3);
  392. }
  393. }
  394. }