RegressionTest.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. namespace ReactiveTests.Tests
  13. {
  14. [TestClass]
  15. public class RegressionTest : ReactiveTest
  16. {
  17. #if DESKTOPCLR40 || DESKTOPCLR45 || DESKTOPCLR46
  18. [TestMethod]
  19. public void Bug_ConcurrentMerge()
  20. {
  21. const int reps = 1000;
  22. var source = Enumerable.Range(0, reps).ToObservable();
  23. var resultQueue = new System.Collections.Concurrent.ConcurrentQueue<int>();
  24. var r = new Random();
  25. source.Select(i => Observable.Create<Unit>(o =>
  26. {
  27. resultQueue.Enqueue(i);
  28. System.Threading.Tasks.Task.Factory.StartNew(
  29. () =>
  30. {
  31. Thread.Sleep(r.Next(10));
  32. o.OnCompleted();
  33. });
  34. return () => { };
  35. })).Merge(3).ForEach(_ => { });
  36. Assert.IsTrue(Enumerable.Range(0, reps).ToList().SequenceEqual(resultQueue.ToList()));
  37. }
  38. #endif
  39. [TestMethod]
  40. public void Bug_1283()
  41. {
  42. var scheduler = new TestScheduler();
  43. var xs = scheduler.CreateHotObservable(
  44. OnNext(100, 1),
  45. OnNext(220, 2),
  46. OnNext(240, 3),
  47. OnNext(300, 4),
  48. OnNext(310, 5),
  49. OnCompleted<int>(350)
  50. );
  51. var results = scheduler.Start(() =>
  52. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  53. );
  54. results.Messages.AssertEqual(
  55. OnNext(220, "0 2"),
  56. OnNext(240, "0 3"),
  57. OnNext(300, "0 4"),
  58. OnNext(301, "0 end"),
  59. OnNext(310, "1 5"),
  60. OnNext(351, "1 end"),
  61. OnCompleted<string>(351)
  62. );
  63. }
  64. [TestMethod]
  65. public void Bug_1261()
  66. {
  67. var scheduler = new TestScheduler();
  68. var xs = scheduler.CreateHotObservable(
  69. OnNext(205, 1),
  70. OnNext(210, 2),
  71. OnNext(215, 3),
  72. OnNext(220, 4),
  73. OnNext(225, 5),
  74. OnNext(230, 6),
  75. OnCompleted<int>(230));
  76. var results = scheduler.Start(() =>
  77. xs.Window(TimeSpan.FromTicks(10), scheduler).Select((x, i) => x.Select(y => i.ToString() + " " + y.ToString()).Concat(Observable.Return(i.ToString() + " end", scheduler))).Merge()
  78. );
  79. results.Messages.AssertEqual(
  80. OnNext(205, "0 1"),
  81. OnNext(210, "0 2"),
  82. OnNext(211, "0 end"),
  83. OnNext(215, "1 3"),
  84. OnNext(220, "1 4"),
  85. OnNext(221, "1 end"),
  86. OnNext(225, "2 5"),
  87. OnNext(230, "2 6"),
  88. OnNext(231, "2 end"),
  89. OnCompleted<string>(231)
  90. );
  91. }
  92. [TestMethod]
  93. public void Bug_1130()
  94. {
  95. var xs = Observable.Start(() => 5);
  96. Assert.IsNull(xs as ISubject<int, int>);
  97. }
  98. [TestMethod]
  99. public void Bug_1286()
  100. {
  101. var infinite = Observable.Return(new { Name = "test", Value = 0d }, DefaultScheduler.Instance).Repeat();
  102. var grouped = infinite.GroupBy(x => x.Name, x => x.Value);
  103. var disp = grouped.Subscribe(_ => { });
  104. Thread.Sleep(1);
  105. //most of the time, this deadlocks
  106. disp.Dispose();
  107. disp = grouped.Subscribe(_ => { });
  108. Thread.Sleep(1);
  109. //if the first doesn't this one always
  110. disp.Dispose();
  111. }
  112. [TestMethod]
  113. public void Bug_1287()
  114. {
  115. var flag = false;
  116. var x = Observable.Return(1, Scheduler.CurrentThread).Concat(Observable.Never<int>()).Finally(() => flag = true).First();
  117. Assert.AreEqual(1, x);
  118. Assert.IsTrue(flag);
  119. }
  120. #if !SILVERLIGHTM7
  121. static IEnumerable<int> Bug_1333_Enumerable(AsyncSubject<IDisposable> s, Semaphore sema)
  122. {
  123. var d = s.First();
  124. var t = new Thread(() => { d.Dispose(); sema.Release(); });
  125. t.Start();
  126. t.Join();
  127. yield return 1;
  128. }
  129. [TestMethod]
  130. [Timeout(1000)]
  131. public void Bug_1333()
  132. {
  133. var sema = new Semaphore(0, 1);
  134. var d = new AsyncSubject<IDisposable>();
  135. var e = Bug_1333_Enumerable(d, sema).ToObservable(DefaultScheduler.Instance).Subscribe();
  136. d.OnNext(e);
  137. d.OnCompleted();
  138. sema.WaitOne();
  139. }
  140. #endif
  141. [TestMethod]
  142. public void Bug_1295_Completed()
  143. {
  144. var scheduler = new TestScheduler();
  145. var xs = scheduler.CreateHotObservable(
  146. OnNext(300, 1),
  147. OnNext(350, 2),
  148. OnNext(500, 3),
  149. OnCompleted<int>(550)
  150. );
  151. var results = scheduler.Start(() =>
  152. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  153. );
  154. results.Messages.AssertEqual(
  155. OnNext(450, 2),
  156. OnNext(550, 3),
  157. OnCompleted<int>(550)
  158. );
  159. xs.Subscriptions.AssertEqual(
  160. Subscribe(200, 550)
  161. );
  162. }
  163. [TestMethod]
  164. public void Bug_1295_Error()
  165. {
  166. var scheduler = new TestScheduler();
  167. var ex = new Exception();
  168. var xs = scheduler.CreateHotObservable(
  169. OnNext(300, 1),
  170. OnNext(350, 2),
  171. OnNext(500, 3),
  172. OnError<int>(550, ex)
  173. );
  174. var results = scheduler.Start(() =>
  175. xs.Throttle(TimeSpan.FromTicks(100), scheduler)
  176. );
  177. results.Messages.AssertEqual(
  178. OnNext(450, 2),
  179. OnError<int>(550, ex)
  180. );
  181. xs.Subscriptions.AssertEqual(
  182. Subscribe(200, 550)
  183. );
  184. }
  185. [TestMethod]
  186. public void Bug_1297_Catch_None()
  187. {
  188. var scheduler = new TestScheduler();
  189. var results = scheduler.Start(() =>
  190. Observable.Catch<int>()
  191. );
  192. results.Messages.AssertEqual(
  193. OnCompleted<int>(200)
  194. );
  195. }
  196. [TestMethod]
  197. public void Bug_1297_OnErrorResumeNext_None()
  198. {
  199. var scheduler = new TestScheduler();
  200. var results = scheduler.Start(() =>
  201. Observable.OnErrorResumeNext<int>()
  202. );
  203. results.Messages.AssertEqual(
  204. OnCompleted<int>(200)
  205. );
  206. }
  207. [TestMethod]
  208. public void Bug_1297_Catch_Single()
  209. {
  210. var scheduler = new TestScheduler();
  211. var ex = new Exception();
  212. var xs = Observable.Throw<int>(ex, scheduler);
  213. var results = scheduler.Start(() =>
  214. Observable.Catch(xs)
  215. );
  216. results.Messages.AssertEqual(
  217. OnError<int>(201, ex)
  218. );
  219. }
  220. [TestMethod]
  221. public void Bug_1297_OnErrorResumeNext_Single()
  222. {
  223. var scheduler = new TestScheduler();
  224. var xs = Observable.Throw<int>(new Exception(), scheduler);
  225. var results = scheduler.Start(() =>
  226. Observable.OnErrorResumeNext(xs)
  227. );
  228. results.Messages.AssertEqual(
  229. OnCompleted<int>(201)
  230. );
  231. }
  232. [TestMethod]
  233. public void Bug_1297_Catch_Multi()
  234. {
  235. var scheduler = new TestScheduler();
  236. var ex1 = new Exception();
  237. var ex2 = new Exception();
  238. var ex3 = new Exception();
  239. var xs = Observable.Throw<int>(ex1, scheduler);
  240. var ys = Observable.Throw<int>(ex2, scheduler);
  241. var zs = Observable.Throw<int>(ex3, scheduler);
  242. var results = scheduler.Start(() =>
  243. Observable.Catch(xs, ys, zs)
  244. );
  245. results.Messages.AssertEqual(
  246. OnError<int>(203, ex3)
  247. );
  248. }
  249. [TestMethod]
  250. public void Bug_1297_OnErrorResumeNext_Multi()
  251. {
  252. var scheduler = new TestScheduler();
  253. var ex1 = new Exception();
  254. var ex2 = new Exception();
  255. var ex3 = new Exception();
  256. var xs = Observable.Throw<int>(ex1, scheduler);
  257. var ys = Observable.Throw<int>(ex2, scheduler);
  258. var zs = Observable.Throw<int>(ex3, scheduler);
  259. var results = scheduler.Start(() =>
  260. Observable.OnErrorResumeNext(xs, ys, zs)
  261. );
  262. results.Messages.AssertEqual(
  263. OnCompleted<int>(203)
  264. );
  265. }
  266. [TestMethod]
  267. public void Bug_1380()
  268. {
  269. var scheduler = new TestScheduler();
  270. var ex = new Exception();
  271. var xs = scheduler.CreateHotObservable(
  272. OnNext(220, 1),
  273. OnNext(250, 2),
  274. OnNext(270, 3),
  275. OnNext(290, 4),
  276. OnNext(310, 5),
  277. OnNext(340, 6),
  278. OnNext(360, 7),
  279. OnError<int>(380, ex)
  280. );
  281. var results = scheduler.Start(() =>
  282. xs.Delay(TimeSpan.FromTicks(100), scheduler)
  283. );
  284. results.Messages.AssertEqual(
  285. OnNext(320, 1),
  286. OnNext(350, 2),
  287. OnNext(370, 3),
  288. OnError<int>(380, ex)
  289. );
  290. xs.Subscriptions.AssertEqual(
  291. Subscribe(200, 380)
  292. );
  293. }
  294. [TestMethod]
  295. public void Bug_1356()
  296. {
  297. var run = false;
  298. Observable.Range(0, 10).Finally(() => run = true).Take(5).ForEach(_ => { });
  299. Assert.IsTrue(run);
  300. }
  301. [TestMethod]
  302. public void Bug_1381()
  303. {
  304. var scheduler = new TestScheduler();
  305. var xs = scheduler.CreateHotObservable(
  306. OnNext( 90, 1),
  307. OnNext(110, 2),
  308. OnNext(250, 3),
  309. OnNext(270, 4),
  310. OnNext(280, 5),
  311. OnNext(301, 6),
  312. OnNext(302, 7),
  313. OnNext(400, 8),
  314. OnNext(401, 9),
  315. OnNext(510, 10)
  316. );
  317. var results = scheduler.CreateObserver<int>();
  318. var ys = default(IConnectableObservable<int>);
  319. var connection = default(IDisposable);
  320. var subscription = default(IDisposable);
  321. scheduler.ScheduleAbsolute(100, () => ys = xs.Multicast(new ReplaySubject<int>(scheduler)));
  322. scheduler.ScheduleAbsolute(200, () => connection = ys.Connect());
  323. scheduler.ScheduleAbsolute(300, () => subscription = ys.Subscribe(results));
  324. scheduler.ScheduleAbsolute(500, () => subscription.Dispose());
  325. scheduler.ScheduleAbsolute(600, () => connection.Dispose());
  326. scheduler.Start();
  327. results.Messages.AssertEqual(
  328. OnNext(301, 3),
  329. OnNext(302, 4),
  330. OnNext(303, 5),
  331. OnNext(304, 6),
  332. OnNext(305, 7),
  333. OnNext(401, 8),
  334. OnNext(402, 9)
  335. );
  336. xs.Subscriptions.AssertEqual(
  337. Subscribe(200, 600)
  338. );
  339. }
  340. [TestMethod]
  341. public void Reentrant_Subject()
  342. {
  343. var s = Subject.Synchronize(new Subject<int>(), Scheduler.Immediate);
  344. var list = new List<int>();
  345. s.Subscribe(
  346. x =>
  347. {
  348. list.Add(x);
  349. if (x < 3)
  350. s.OnNext(x + 1);
  351. list.Add(-x);
  352. });
  353. s.OnNext(1);
  354. list.AssertEqual(1, -1, 2, -2, 3, -3);
  355. }
  356. [TestMethod]
  357. public void Merge_Trampoline1()
  358. {
  359. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => xs.Merge(xs));
  360. var list = new List<int>();
  361. ys.Subscribe(list.Add);
  362. list.AssertEqual(1, 1, 2, 2, 3, 3);
  363. }
  364. [TestMethod]
  365. public void Merge_Trampoline2()
  366. {
  367. var ys = new[] { 1, 2, 3 }.ToObservable().Publish(xs => Observable.Merge(xs, xs, xs, xs));
  368. var list = new List<int>();
  369. ys.Subscribe(list.Add);
  370. list.AssertEqual(1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3);
  371. }
  372. }
  373. }