RegressionTest.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using System.Threading;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. namespace ReactiveTests.Tests
  15. {
  16. public class RegressionTest : ReactiveTest
  17. {
  18. #if NET45 || NET46
  19. [Fact]
  20. public void Bug_ConcurrentMerge()
  21. {
  22. const int reps = 1000;
  23. var source = Enumerable.Range(0, reps).ToObservable();
  24. var resultQueue = new System.Collections.Concurrent.ConcurrentQueue<int>();
  25. var r = new Random();
  26. source.Select(i => Observable.Create<Unit>(o =>
  27. {
  28. resultQueue.Enqueue(i);
  29. System.Threading.Tasks.Task.Factory.StartNew(
  30. () =>
  31. {
  32. Thread.Sleep(r.Next(10));
  33. o.OnCompleted();
  34. });
  35. return () => { };
  36. })).Merge(3).ForEach(_ => { });
  37. Assert.True(Enumerable.Range(0, reps).ToList().SequenceEqual(resultQueue.ToList()));
  38. }
  39. #endif
  40. [Fact]
  41. public void Bug_1283()
  42. {
  43. var scheduler = new TestScheduler();
  44. var xs = scheduler.CreateHotObservable(
  45. OnNext(100, 1),
  46. OnNext(220, 2),
  47. OnNext(240, 3),
  48. OnNext(300, 4),
  49. OnNext(310, 5),
  50. OnCompleted<int>(350)
  51. );
  52. var results = scheduler.Start(() =>
  53. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  54. );
  55. results.Messages.AssertEqual(
  56. OnNext(220, "0 2"),
  57. OnNext(240, "0 3"),
  58. OnNext(300, "0 4"),
  59. OnNext(301, "0 end"),
  60. OnNext(310, "1 5"),
  61. OnNext(351, "1 end"),
  62. OnCompleted<string>(351)
  63. );
  64. }
  65. [Fact]
  66. public void Bug_1261()
  67. {
  68. var scheduler = new TestScheduler();
  69. var xs = scheduler.CreateHotObservable(
  70. OnNext(205, 1),
  71. OnNext(210, 2),
  72. OnNext(215, 3),
  73. OnNext(220, 4),
  74. OnNext(225, 5),
  75. OnNext(230, 6),
  76. OnCompleted<int>(230));
  77. var results = scheduler.Start(() =>
  78. xs.Window(TimeSpan.FromTicks(10), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  79. );
  80. results.Messages.AssertEqual(
  81. OnNext(205, "0 1"),
  82. OnNext(210, "0 2"),
  83. OnNext(211, "0 end"),
  84. OnNext(215, "1 3"),
  85. OnNext(220, "1 4"),
  86. OnNext(221, "1 end"),
  87. OnNext(225, "2 5"),
  88. OnNext(230, "2 6"),
  89. OnNext(231, "2 end"),
  90. OnCompleted<string>(231)
  91. );
  92. }
  93. [Fact]
  94. public void Bug_1130()
  95. {
  96. var xs = Observable.Start(() => 5);
  97. Assert.Null(xs as ISubject<int, int>);
  98. }
  99. #if !NO_THREAD
  100. [Fact]
  101. public void Bug_1286()
  102. {
  103. var infinite = Observable.Return(new { Name = "test", Value = 0d }, DefaultScheduler.Instance).Repeat();
  104. var grouped = infinite.GroupBy(x => x.Name, x => x.Value);
  105. var disp = grouped.Subscribe(_ => { });
  106. Thread.Sleep(1);
  107. //most of the time, this deadlocks
  108. disp.Dispose();
  109. disp = grouped.Subscribe(_ => { });
  110. Thread.Sleep(1);
  111. //if the first doesn't this one always
  112. disp.Dispose();
  113. }
  114. #endif
  115. [Fact]
  116. public void Bug_1287()
  117. {
  118. var flag = false;
  119. var x = Observable.Return(1, Scheduler.CurrentThread).Concat(Observable.Never<int>()).Finally(() => flag = true).First();
  120. Assert.Equal(1, x);
  121. Assert.True(flag);
  122. }
  123. #if !SILVERLIGHTM7
  124. #if !NO_THREAD
  125. static IEnumerable<int> Bug_1333_Enumerable(AsyncSubject<IDisposable> s, Semaphore sema)
  126. {
  127. var d = s.First();
  128. var t = new Thread(() => { d.Dispose(); sema.Release(); });
  129. t.Start();
  130. t.Join();
  131. yield return 1;
  132. }
  133. #endif
  134. #if !NO_THREAD
  135. [Fact]
  136. //[Timeout(1000)]
  137. public void Bug_1333()
  138. {
  139. var sema = new Semaphore(0, 1);
  140. var d = new AsyncSubject<IDisposable>();
  141. var e = Bug_1333_Enumerable(d, sema).ToObservable(DefaultScheduler.Instance).Subscribe();
  142. d.OnNext(e);
  143. d.OnCompleted();
  144. sema.WaitOne();
  145. }
  146. #endif
  147. #endif
  148. [Fact]
  149. public void Bug_1295_Completed()
  150. {
  151. var scheduler = new TestScheduler();
  152. var xs = scheduler.CreateHotObservable(
  153. OnNext(300, 1),
  154. OnNext(350, 2),
  155. OnNext(500, 3),
  156. OnCompleted<int>(550)
  157. );
  158. var results = scheduler.Start(() =>
  159. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  160. );
  161. results.Messages.AssertEqual(
  162. OnNext(450, 2),
  163. OnNext(550, 3),
  164. OnCompleted<int>(550)
  165. );
  166. xs.Subscriptions.AssertEqual(
  167. Subscribe(200, 550)
  168. );
  169. }
  170. [Fact]
  171. public void Bug_1295_Error()
  172. {
  173. var scheduler = new TestScheduler();
  174. var ex = new Exception();
  175. var xs = scheduler.CreateHotObservable(
  176. OnNext(300, 1),
  177. OnNext(350, 2),
  178. OnNext(500, 3),
  179. OnError<int>(550, ex)
  180. );
  181. var results = scheduler.Start(() =>
  182. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  183. );
  184. results.Messages.AssertEqual(
  185. OnNext(450, 2),
  186. OnError<int>(550, ex)
  187. );
  188. xs.Subscriptions.AssertEqual(
  189. Subscribe(200, 550)
  190. );
  191. }
  192. [Fact]
  193. public void Bug_1297_Catch_None()
  194. {
  195. var scheduler = new TestScheduler();
  196. var results = scheduler.Start(() =>
  197. Observable.Catch<int>()
  198. );
  199. results.Messages.AssertEqual(
  200. OnCompleted<int>(200)
  201. );
  202. }
  203. [Fact]
  204. public void Bug_1297_OnErrorResumeNext_None()
  205. {
  206. var scheduler = new TestScheduler();
  207. var results = scheduler.Start(() =>
  208. Observable.OnErrorResumeNext<int>()
  209. );
  210. results.Messages.AssertEqual(
  211. OnCompleted<int>(200)
  212. );
  213. }
  214. [Fact]
  215. public void Bug_1297_Catch_Single()
  216. {
  217. var scheduler = new TestScheduler();
  218. var ex = new Exception();
  219. var xs = Observable.Throw<int>(ex, scheduler);
  220. var results = scheduler.Start(() =>
  221. Observable.Catch(xs)
  222. );
  223. results.Messages.AssertEqual(
  224. OnError<int>(201, ex)
  225. );
  226. }
  227. [Fact]
  228. public void Bug_1297_OnErrorResumeNext_Single()
  229. {
  230. var scheduler = new TestScheduler();
  231. var xs = Observable.Throw<int>(new Exception(), scheduler);
  232. var results = scheduler.Start(() =>
  233. Observable.OnErrorResumeNext(xs)
  234. );
  235. results.Messages.AssertEqual(
  236. OnCompleted<int>(201)
  237. );
  238. }
  239. [Fact]
  240. public void Bug_1297_Catch_Multi()
  241. {
  242. var scheduler = new TestScheduler();
  243. var ex1 = new Exception();
  244. var ex2 = new Exception();
  245. var ex3 = new Exception();
  246. var xs = Observable.Throw<int>(ex1, scheduler);
  247. var ys = Observable.Throw<int>(ex2, scheduler);
  248. var zs = Observable.Throw<int>(ex3, scheduler);
  249. var results = scheduler.Start(() =>
  250. Observable.Catch(xs, ys, zs)
  251. );
  252. results.Messages.AssertEqual(
  253. OnError<int>(203, ex3)
  254. );
  255. }
  256. [Fact]
  257. public void Bug_1297_OnErrorResumeNext_Multi()
  258. {
  259. var scheduler = new TestScheduler();
  260. var ex1 = new Exception();
  261. var ex2 = new Exception();
  262. var ex3 = new Exception();
  263. var xs = Observable.Throw<int>(ex1, scheduler);
  264. var ys = Observable.Throw<int>(ex2, scheduler);
  265. var zs = Observable.Throw<int>(ex3, scheduler);
  266. var results = scheduler.Start(() =>
  267. Observable.OnErrorResumeNext(xs, ys, zs)
  268. );
  269. results.Messages.AssertEqual(
  270. OnCompleted<int>(203)
  271. );
  272. }
  273. [Fact]
  274. public void Bug_1380()
  275. {
  276. var scheduler = new TestScheduler();
  277. var ex = new Exception();
  278. var xs = scheduler.CreateHotObservable(
  279. OnNext(220, 1),
  280. OnNext(250, 2),
  281. OnNext(270, 3),
  282. OnNext(290, 4),
  283. OnNext(310, 5),
  284. OnNext(340, 6),
  285. OnNext(360, 7),
  286. OnError<int>(380, ex)
  287. );
  288. var results = scheduler.Start(() =>
  289. xs.Delay(TimeSpan.FromTicks(100), scheduler)
  290. );
  291. results.Messages.AssertEqual(
  292. OnNext(320, 1),
  293. OnNext(350, 2),
  294. OnNext(370, 3),
  295. OnError<int>(380, ex)
  296. );
  297. xs.Subscriptions.AssertEqual(
  298. Subscribe(200, 380)
  299. );
  300. }
  301. [Fact]
  302. public void Bug_1356()
  303. {
  304. var run = false;
  305. Observable.Range(0, 10).Finally(() => run = true).Take(5).ForEach(_ => { });
  306. Assert.True(run);
  307. }
  308. [Fact]
  309. public void Bug_1381()
  310. {
  311. var scheduler = new TestScheduler();
  312. var xs = scheduler.CreateHotObservable(
  313. OnNext( 90, 1),
  314. OnNext(110, 2),
  315. OnNext(250, 3),
  316. OnNext(270, 4),
  317. OnNext(280, 5),
  318. OnNext(301, 6),
  319. OnNext(302, 7),
  320. OnNext(400, 8),
  321. OnNext(401, 9),
  322. OnNext(510, 10)
  323. );
  324. var results = scheduler.CreateObserver<int>();
  325. var ys = default(IConnectableObservable<int>);
  326. var connection = default(IDisposable);
  327. var subscription = default(IDisposable);
  328. scheduler.ScheduleAbsolute(100, () => ys = xs.Multicast(new ReplaySubject<int>(scheduler)));
  329. scheduler.ScheduleAbsolute(200, () => connection = ys.Connect());
  330. scheduler.ScheduleAbsolute(300, () => subscription = ys.Subscribe(results));
  331. scheduler.ScheduleAbsolute(500, () => subscription.Dispose());
  332. scheduler.ScheduleAbsolute(600, () => connection.Dispose());
  333. scheduler.Start();
  334. results.Messages.AssertEqual(
  335. OnNext(301, 3),
  336. OnNext(302, 4),
  337. OnNext(303, 5),
  338. OnNext(304, 6),
  339. OnNext(305, 7),
  340. OnNext(401, 8),
  341. OnNext(402, 9)
  342. );
  343. xs.Subscriptions.AssertEqual(
  344. Subscribe(200, 600)
  345. );
  346. }
  347. [Fact]
  348. public void Reentrant_Subject1()
  349. {
  350. var s = Subject.Synchronize((ISubject<int, int>)new Subject<int>(), Scheduler.Immediate);
  351. var list = new List<int>();
  352. s.Subscribe(
  353. x =>
  354. {
  355. list.Add(x);
  356. if (x < 3)
  357. s.OnNext(x + 1);
  358. list.Add(-x);
  359. });
  360. s.OnNext(1);
  361. list.AssertEqual(1, -1, 2, -2, 3, -3);
  362. }
  363. [Fact]
  364. public void Reentrant_Subject2()
  365. {
  366. var s = Subject.Synchronize(new Subject<int>(), Scheduler.Immediate);
  367. var list = new List<int>();
  368. s.Subscribe(
  369. x =>
  370. {
  371. list.Add(x);
  372. if (x < 3)
  373. s.OnNext(x + 1);
  374. list.Add(-x);
  375. });
  376. s.OnNext(1);
  377. list.AssertEqual(1, -1, 2, -2, 3, -3);
  378. }
  379. [Fact]
  380. public void Merge_Trampoline1()
  381. {
  382. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => xs.Merge(xs));
  383. var list = new List<int>();
  384. ys.Subscribe(list.Add);
  385. list.AssertEqual(1, 1, 2, 2, 3, 3);
  386. }
  387. [Fact]
  388. public void Merge_Trampoline2()
  389. {
  390. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => Observable.Merge(xs, xs, xs, xs));
  391. var list = new List<int>();
  392. ys.Subscribe(list.Add);
  393. list.AssertEqual(1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3);
  394. }
  395. }
  396. }