RegressionTest.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using Xunit;
  12. namespace ReactiveTests.Tests
  13. {
  14. public class RegressionTest : ReactiveTest
  15. {
  16. #if DESKTOPCLR40 || DESKTOPCLR45 || DESKTOPCLR46
  17. [Fact]
  18. public void Bug_ConcurrentMerge()
  19. {
  20. const int reps = 1000;
  21. var source = Enumerable.Range(0, reps).ToObservable();
  22. var resultQueue = new System.Collections.Concurrent.ConcurrentQueue<int>();
  23. var r = new Random();
  24. source.Select(i => Observable.Create<Unit>(o =>
  25. {
  26. resultQueue.Enqueue(i);
  27. System.Threading.Tasks.Task.Factory.StartNew(
  28. () =>
  29. {
  30. Thread.Sleep(r.Next(10));
  31. o.OnCompleted();
  32. });
  33. return () => { };
  34. })).Merge(3).ForEach(_ => { });
  35. Assert.True(Enumerable.Range(0, reps).ToList().SequenceEqual(resultQueue.ToList()));
  36. }
  37. #endif
  38. [Fact]
  39. public void Bug_1283()
  40. {
  41. var scheduler = new TestScheduler();
  42. var xs = scheduler.CreateHotObservable(
  43. OnNext(100, 1),
  44. OnNext(220, 2),
  45. OnNext(240, 3),
  46. OnNext(300, 4),
  47. OnNext(310, 5),
  48. OnCompleted<int>(350)
  49. );
  50. var results = scheduler.Start(() =>
  51. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  52. );
  53. results.Messages.AssertEqual(
  54. OnNext(220, "0 2"),
  55. OnNext(240, "0 3"),
  56. OnNext(300, "0 4"),
  57. OnNext(301, "0 end"),
  58. OnNext(310, "1 5"),
  59. OnNext(351, "1 end"),
  60. OnCompleted<string>(351)
  61. );
  62. }
  63. [Fact]
  64. public void Bug_1261()
  65. {
  66. var scheduler = new TestScheduler();
  67. var xs = scheduler.CreateHotObservable(
  68. OnNext(205, 1),
  69. OnNext(210, 2),
  70. OnNext(215, 3),
  71. OnNext(220, 4),
  72. OnNext(225, 5),
  73. OnNext(230, 6),
  74. OnCompleted<int>(230));
  75. var results = scheduler.Start(() =>
  76. xs.Window(TimeSpan.FromTicks(10), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  77. );
  78. results.Messages.AssertEqual(
  79. OnNext(205, "0 1"),
  80. OnNext(210, "0 2"),
  81. OnNext(211, "0 end"),
  82. OnNext(215, "1 3"),
  83. OnNext(220, "1 4"),
  84. OnNext(221, "1 end"),
  85. OnNext(225, "2 5"),
  86. OnNext(230, "2 6"),
  87. OnNext(231, "2 end"),
  88. OnCompleted<string>(231)
  89. );
  90. }
  91. [Fact]
  92. public void Bug_1130()
  93. {
  94. var xs = Observable.Start(() => 5);
  95. Assert.Null(xs as ISubject<int, int>);
  96. }
  97. #if !NO_THREAD
  98. [Fact]
  99. public void Bug_1286()
  100. {
  101. var infinite = Observable.Return(new { Name = "test", Value = 0d }, DefaultScheduler.Instance).Repeat();
  102. var grouped = infinite.GroupBy(x => x.Name, x => x.Value);
  103. var disp = grouped.Subscribe(_ => { });
  104. Thread.Sleep(1);
  105. //most of the time, this deadlocks
  106. disp.Dispose();
  107. disp = grouped.Subscribe(_ => { });
  108. Thread.Sleep(1);
  109. //if the first doesn't this one always
  110. disp.Dispose();
  111. }
  112. #endif
  113. [Fact]
  114. public void Bug_1287()
  115. {
  116. var flag = false;
  117. var x = Observable.Return(1, Scheduler.CurrentThread).Concat(Observable.Never<int>()).Finally(() => flag = true).First();
  118. Assert.Equal(1, x);
  119. Assert.True(flag);
  120. }
  121. #if !SILVERLIGHTM7
  122. #if !NO_THREAD
  123. static IEnumerable<int> Bug_1333_Enumerable(AsyncSubject<IDisposable> s, Semaphore sema)
  124. {
  125. var d = s.First();
  126. var t = new Thread(() => { d.Dispose(); sema.Release(); });
  127. t.Start();
  128. t.Join();
  129. yield return 1;
  130. }
  131. #endif
  132. #if !NO_THREAD
  133. [Fact]
  134. //[Timeout(1000)]
  135. public void Bug_1333()
  136. {
  137. var sema = new Semaphore(0, 1);
  138. var d = new AsyncSubject<IDisposable>();
  139. var e = Bug_1333_Enumerable(d, sema).ToObservable(DefaultScheduler.Instance).Subscribe();
  140. d.OnNext(e);
  141. d.OnCompleted();
  142. sema.WaitOne();
  143. }
  144. #endif
  145. #endif
  146. [Fact]
  147. public void Bug_1295_Completed()
  148. {
  149. var scheduler = new TestScheduler();
  150. var xs = scheduler.CreateHotObservable(
  151. OnNext(300, 1),
  152. OnNext(350, 2),
  153. OnNext(500, 3),
  154. OnCompleted<int>(550)
  155. );
  156. var results = scheduler.Start(() =>
  157. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  158. );
  159. results.Messages.AssertEqual(
  160. OnNext(450, 2),
  161. OnNext(550, 3),
  162. OnCompleted<int>(550)
  163. );
  164. xs.Subscriptions.AssertEqual(
  165. Subscribe(200, 550)
  166. );
  167. }
  168. [Fact]
  169. public void Bug_1295_Error()
  170. {
  171. var scheduler = new TestScheduler();
  172. var ex = new Exception();
  173. var xs = scheduler.CreateHotObservable(
  174. OnNext(300, 1),
  175. OnNext(350, 2),
  176. OnNext(500, 3),
  177. OnError<int>(550, ex)
  178. );
  179. var results = scheduler.Start(() =>
  180. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  181. );
  182. results.Messages.AssertEqual(
  183. OnNext(450, 2),
  184. OnError<int>(550, ex)
  185. );
  186. xs.Subscriptions.AssertEqual(
  187. Subscribe(200, 550)
  188. );
  189. }
  190. [Fact]
  191. public void Bug_1297_Catch_None()
  192. {
  193. var scheduler = new TestScheduler();
  194. var results = scheduler.Start(() =>
  195. Observable.Catch<int>()
  196. );
  197. results.Messages.AssertEqual(
  198. OnCompleted<int>(200)
  199. );
  200. }
  201. [Fact]
  202. public void Bug_1297_OnErrorResumeNext_None()
  203. {
  204. var scheduler = new TestScheduler();
  205. var results = scheduler.Start(() =>
  206. Observable.OnErrorResumeNext<int>()
  207. );
  208. results.Messages.AssertEqual(
  209. OnCompleted<int>(200)
  210. );
  211. }
  212. [Fact]
  213. public void Bug_1297_Catch_Single()
  214. {
  215. var scheduler = new TestScheduler();
  216. var ex = new Exception();
  217. var xs = Observable.Throw<int>(ex, scheduler);
  218. var results = scheduler.Start(() =>
  219. Observable.Catch(xs)
  220. );
  221. results.Messages.AssertEqual(
  222. OnError<int>(201, ex)
  223. );
  224. }
  225. [Fact]
  226. public void Bug_1297_OnErrorResumeNext_Single()
  227. {
  228. var scheduler = new TestScheduler();
  229. var xs = Observable.Throw<int>(new Exception(), scheduler);
  230. var results = scheduler.Start(() =>
  231. Observable.OnErrorResumeNext(xs)
  232. );
  233. results.Messages.AssertEqual(
  234. OnCompleted<int>(201)
  235. );
  236. }
  237. [Fact]
  238. public void Bug_1297_Catch_Multi()
  239. {
  240. var scheduler = new TestScheduler();
  241. var ex1 = new Exception();
  242. var ex2 = new Exception();
  243. var ex3 = new Exception();
  244. var xs = Observable.Throw<int>(ex1, scheduler);
  245. var ys = Observable.Throw<int>(ex2, scheduler);
  246. var zs = Observable.Throw<int>(ex3, scheduler);
  247. var results = scheduler.Start(() =>
  248. Observable.Catch(xs, ys, zs)
  249. );
  250. results.Messages.AssertEqual(
  251. OnError<int>(203, ex3)
  252. );
  253. }
  254. [Fact]
  255. public void Bug_1297_OnErrorResumeNext_Multi()
  256. {
  257. var scheduler = new TestScheduler();
  258. var ex1 = new Exception();
  259. var ex2 = new Exception();
  260. var ex3 = new Exception();
  261. var xs = Observable.Throw<int>(ex1, scheduler);
  262. var ys = Observable.Throw<int>(ex2, scheduler);
  263. var zs = Observable.Throw<int>(ex3, scheduler);
  264. var results = scheduler.Start(() =>
  265. Observable.OnErrorResumeNext(xs, ys, zs)
  266. );
  267. results.Messages.AssertEqual(
  268. OnCompleted<int>(203)
  269. );
  270. }
  271. [Fact]
  272. public void Bug_1380()
  273. {
  274. var scheduler = new TestScheduler();
  275. var ex = new Exception();
  276. var xs = scheduler.CreateHotObservable(
  277. OnNext(220, 1),
  278. OnNext(250, 2),
  279. OnNext(270, 3),
  280. OnNext(290, 4),
  281. OnNext(310, 5),
  282. OnNext(340, 6),
  283. OnNext(360, 7),
  284. OnError<int>(380, ex)
  285. );
  286. var results = scheduler.Start(() =>
  287. xs.Delay(TimeSpan.FromTicks(100), scheduler)
  288. );
  289. results.Messages.AssertEqual(
  290. OnNext(320, 1),
  291. OnNext(350, 2),
  292. OnNext(370, 3),
  293. OnError<int>(380, ex)
  294. );
  295. xs.Subscriptions.AssertEqual(
  296. Subscribe(200, 380)
  297. );
  298. }
  299. [Fact]
  300. public void Bug_1356()
  301. {
  302. var run = false;
  303. Observable.Range(0, 10).Finally(() => run = true).Take(5).ForEach(_ => { });
  304. Assert.True(run);
  305. }
  306. [Fact]
  307. public void Bug_1381()
  308. {
  309. var scheduler = new TestScheduler();
  310. var xs = scheduler.CreateHotObservable(
  311. OnNext( 90, 1),
  312. OnNext(110, 2),
  313. OnNext(250, 3),
  314. OnNext(270, 4),
  315. OnNext(280, 5),
  316. OnNext(301, 6),
  317. OnNext(302, 7),
  318. OnNext(400, 8),
  319. OnNext(401, 9),
  320. OnNext(510, 10)
  321. );
  322. var results = scheduler.CreateObserver<int>();
  323. var ys = default(IConnectableObservable<int>);
  324. var connection = default(IDisposable);
  325. var subscription = default(IDisposable);
  326. scheduler.ScheduleAbsolute(100, () => ys = xs.Multicast(new ReplaySubject<int>(scheduler)));
  327. scheduler.ScheduleAbsolute(200, () => connection = ys.Connect());
  328. scheduler.ScheduleAbsolute(300, () => subscription = ys.Subscribe(results));
  329. scheduler.ScheduleAbsolute(500, () => subscription.Dispose());
  330. scheduler.ScheduleAbsolute(600, () => connection.Dispose());
  331. scheduler.Start();
  332. results.Messages.AssertEqual(
  333. OnNext(301, 3),
  334. OnNext(302, 4),
  335. OnNext(303, 5),
  336. OnNext(304, 6),
  337. OnNext(305, 7),
  338. OnNext(401, 8),
  339. OnNext(402, 9)
  340. );
  341. xs.Subscriptions.AssertEqual(
  342. Subscribe(200, 600)
  343. );
  344. }
  345. [Fact]
  346. public void Reentrant_Subject1()
  347. {
  348. var s = Subject.Synchronize((ISubject<int, int>)new Subject<int>(), Scheduler.Immediate);
  349. var list = new List<int>();
  350. s.Subscribe(
  351. x =>
  352. {
  353. list.Add(x);
  354. if (x < 3)
  355. s.OnNext(x + 1);
  356. list.Add(-x);
  357. });
  358. s.OnNext(1);
  359. list.AssertEqual(1, -1, 2, -2, 3, -3);
  360. }
  361. [Fact]
  362. public void Reentrant_Subject2()
  363. {
  364. var s = Subject.Synchronize(new Subject<int>(), Scheduler.Immediate);
  365. var list = new List<int>();
  366. s.Subscribe(
  367. x =>
  368. {
  369. list.Add(x);
  370. if (x < 3)
  371. s.OnNext(x + 1);
  372. list.Add(-x);
  373. });
  374. s.OnNext(1);
  375. list.AssertEqual(1, -1, 2, -2, 3, -3);
  376. }
  377. [Fact]
  378. public void Merge_Trampoline1()
  379. {
  380. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => xs.Merge(xs));
  381. var list = new List<int>();
  382. ys.Subscribe(list.Add);
  383. list.AssertEqual(1, 1, 2, 2, 3, 3);
  384. }
  385. [Fact]
  386. public void Merge_Trampoline2()
  387. {
  388. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => Observable.Merge(xs, xs, xs, xs));
  389. var list = new List<int>();
  390. ys.Subscribe(list.Add);
  391. list.AssertEqual(1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3);
  392. }
  393. }
  394. }