QueryLanguage.Multiple.cs 77 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Collections.ObjectModel;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Subjects;
  9. #if !NO_TPL
  10. using System.Reactive.Threading.Tasks;
  11. using System.Threading.Tasks;
  12. #endif
  13. namespace System.Reactive.Linq
  14. {
  15. #if !NO_PERF
  16. using Observαble;
  17. #endif
  18. internal partial class QueryLanguage
  19. {
  20. #region + Amb +
  21. public virtual IObservable<TSource> Amb<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  22. {
  23. return Amb_(first, second);
  24. }
  25. public virtual IObservable<TSource> Amb<TSource>(params IObservable<TSource>[] sources)
  26. {
  27. return Amb_(sources);
  28. }
  29. public virtual IObservable<TSource> Amb<TSource>(IEnumerable<IObservable<TSource>> sources)
  30. {
  31. return Amb_(sources);
  32. }
  33. private static IObservable<TSource> Amb_<TSource>(IEnumerable<IObservable<TSource>> sources)
  34. {
  35. return sources.Aggregate(Observable.Never<TSource>(), (previous, current) => previous.Amb(current));
  36. }
  37. private static IObservable<TSource> Amb_<TSource>(IObservable<TSource> leftSource, IObservable<TSource> rightSource)
  38. {
  39. #if !NO_PERF
  40. return new Amb<TSource>(leftSource, rightSource);
  41. #else
  42. return new AnonymousObservable<TSource>(observer =>
  43. {
  44. var leftSubscription = new SingleAssignmentDisposable();
  45. var rightSubscription = new SingleAssignmentDisposable();
  46. var choice = AmbState.Neither;
  47. var gate = new object();
  48. var left = new AmbObserver<TSource>();
  49. var right = new AmbObserver<TSource>();
  50. left.Observer = Observer.Synchronize(Observer.Create<TSource>(
  51. x =>
  52. {
  53. if (choice == AmbState.Neither)
  54. {
  55. choice = AmbState.Left;
  56. rightSubscription.Dispose();
  57. left.Observer = observer;
  58. }
  59. if (choice == AmbState.Left)
  60. observer.OnNext(x);
  61. },
  62. ex =>
  63. {
  64. if (choice == AmbState.Neither)
  65. {
  66. choice = AmbState.Left;
  67. rightSubscription.Dispose();
  68. left.Observer = observer;
  69. }
  70. if (choice == AmbState.Left)
  71. observer.OnError(ex);
  72. },
  73. () =>
  74. {
  75. if (choice == AmbState.Neither)
  76. {
  77. choice = AmbState.Left;
  78. rightSubscription.Dispose();
  79. left.Observer = observer;
  80. }
  81. if (choice == AmbState.Left)
  82. observer.OnCompleted();
  83. }
  84. ), gate);
  85. right.Observer = Observer.Synchronize(Observer.Create<TSource>(
  86. x =>
  87. {
  88. if (choice == AmbState.Neither)
  89. {
  90. choice = AmbState.Right;
  91. leftSubscription.Dispose();
  92. right.Observer = observer;
  93. }
  94. if (choice == AmbState.Right)
  95. observer.OnNext(x);
  96. },
  97. ex =>
  98. {
  99. if (choice == AmbState.Neither)
  100. {
  101. choice = AmbState.Right;
  102. leftSubscription.Dispose();
  103. right.Observer = observer;
  104. }
  105. if (choice == AmbState.Right)
  106. observer.OnError(ex);
  107. },
  108. () =>
  109. {
  110. if (choice == AmbState.Neither)
  111. {
  112. choice = AmbState.Right;
  113. leftSubscription.Dispose();
  114. right.Observer = observer;
  115. }
  116. if (choice == AmbState.Right)
  117. observer.OnCompleted();
  118. }
  119. ), gate);
  120. leftSubscription.Disposable = leftSource.Subscribe(left);
  121. rightSubscription.Disposable = rightSource.Subscribe(right);
  122. return new CompositeDisposable(leftSubscription, rightSubscription);
  123. });
  124. #endif
  125. }
  126. #if NO_PERF
  127. class AmbObserver<TSource> : IObserver<TSource>
  128. {
  129. public virtual IObserver<TSource> Observer { get; set; }
  130. public virtual void OnCompleted()
  131. {
  132. Observer.OnCompleted();
  133. }
  134. public virtual void OnError(Exception error)
  135. {
  136. Observer.OnError(error);
  137. }
  138. public virtual void OnNext(TSource value)
  139. {
  140. Observer.OnNext(value);
  141. }
  142. }
  143. enum AmbState
  144. {
  145. Left,
  146. Right,
  147. Neither
  148. }
  149. #endif
  150. #endregion
  151. #region + Buffer +
  152. public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferClosing>(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)
  153. {
  154. #if !NO_PERF
  155. return new Buffer<TSource, TBufferClosing>(source, bufferClosingSelector);
  156. #else
  157. return source.Window(bufferClosingSelector).SelectMany(ToList);
  158. #endif
  159. }
  160. public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferOpening, TBufferClosing>(IObservable<TSource> source, IObservable<TBufferOpening> bufferOpenings, Func<TBufferOpening, IObservable<TBufferClosing>> bufferClosingSelector)
  161. {
  162. return source.Window(bufferOpenings, bufferClosingSelector).SelectMany(ToList);
  163. }
  164. public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferBoundary>(IObservable<TSource> source, IObservable<TBufferBoundary> bufferBoundaries)
  165. {
  166. #if !NO_PERF
  167. return new Buffer<TSource, TBufferBoundary>(source, bufferBoundaries);
  168. #else
  169. return source.Window(bufferBoundaries).SelectMany(ToList);
  170. #endif
  171. }
  172. #endregion
  173. #region + Catch +
  174. public virtual IObservable<TSource> Catch<TSource, TException>(IObservable<TSource> source, Func<TException, IObservable<TSource>> handler) where TException : Exception
  175. {
  176. #if !NO_PERF
  177. return new Catch<TSource, TException>(source, handler);
  178. #else
  179. return new AnonymousObservable<TSource>(observer =>
  180. {
  181. var subscription = new SerialDisposable();
  182. var d1 = new SingleAssignmentDisposable();
  183. subscription.Disposable = d1;
  184. d1.Disposable = source.Subscribe(observer.OnNext,
  185. exception =>
  186. {
  187. var e = exception as TException;
  188. if (e != null)
  189. {
  190. IObservable<TSource> result;
  191. try
  192. {
  193. result = handler(e);
  194. }
  195. catch (Exception ex)
  196. {
  197. observer.OnError(ex);
  198. return;
  199. }
  200. var d = new SingleAssignmentDisposable();
  201. subscription.Disposable = d;
  202. d.Disposable = result.Subscribe(observer);
  203. }
  204. else
  205. observer.OnError(exception);
  206. }, observer.OnCompleted);
  207. return subscription;
  208. });
  209. #endif
  210. }
  211. public virtual IObservable<TSource> Catch<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  212. {
  213. return Catch_<TSource>(new[] { first, second });
  214. }
  215. public virtual IObservable<TSource> Catch<TSource>(params IObservable<TSource>[] sources)
  216. {
  217. return Catch_<TSource>(sources);
  218. }
  219. public virtual IObservable<TSource> Catch<TSource>(IEnumerable<IObservable<TSource>> sources)
  220. {
  221. return Catch_<TSource>(sources);
  222. }
  223. private static IObservable<TSource> Catch_<TSource>(IEnumerable<IObservable<TSource>> sources)
  224. {
  225. #if !NO_PERF
  226. return new Catch<TSource>(sources);
  227. #else
  228. return new AnonymousObservable<TSource>(observer =>
  229. {
  230. var gate = new AsyncLock();
  231. var isDisposed = false;
  232. var e = sources.GetEnumerator();
  233. var subscription = new SerialDisposable();
  234. var lastException = default(Exception);
  235. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>
  236. {
  237. var current = default(IObservable<TSource>);
  238. var hasNext = false;
  239. var ex = default(Exception);
  240. if (!isDisposed)
  241. {
  242. try
  243. {
  244. hasNext = e.MoveNext();
  245. if (hasNext)
  246. current = e.Current;
  247. else
  248. e.Dispose();
  249. }
  250. catch (Exception exception)
  251. {
  252. ex = exception;
  253. e.Dispose();
  254. }
  255. }
  256. else
  257. return;
  258. if (ex != null)
  259. {
  260. observer.OnError(ex);
  261. return;
  262. }
  263. if (!hasNext)
  264. {
  265. if (lastException != null)
  266. observer.OnError(lastException);
  267. else
  268. observer.OnCompleted();
  269. return;
  270. }
  271. var d = new SingleAssignmentDisposable();
  272. subscription.Disposable = d;
  273. d.Disposable = current.Subscribe(observer.OnNext, exception =>
  274. {
  275. lastException = exception;
  276. self();
  277. }, observer.OnCompleted);
  278. }));
  279. return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>
  280. {
  281. e.Dispose();
  282. isDisposed = true;
  283. })));
  284. });
  285. #endif
  286. }
  287. #endregion
  288. #region + CombineLatest +
  289. public virtual IObservable<TResult> CombineLatest<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  290. {
  291. #if !NO_PERF
  292. return new CombineLatest<TFirst, TSecond, TResult>(first, second, resultSelector);
  293. #else
  294. return new AnonymousObservable<TResult>(observer =>
  295. {
  296. var hasLeft = false;
  297. var hasRight = false;
  298. var left = default(TFirst);
  299. var right = default(TSecond);
  300. var leftDone = false;
  301. var rightDone = false;
  302. var leftSubscription = new SingleAssignmentDisposable();
  303. var rightSubscription = new SingleAssignmentDisposable();
  304. var gate = new object();
  305. leftSubscription.Disposable = first.Synchronize(gate).Subscribe(
  306. l =>
  307. {
  308. hasLeft = true;
  309. left = l;
  310. if (hasRight)
  311. {
  312. var res = default(TResult);
  313. try
  314. {
  315. res = resultSelector(left, right);
  316. }
  317. catch (Exception ex)
  318. {
  319. observer.OnError(ex);
  320. return;
  321. }
  322. observer.OnNext(res);
  323. }
  324. else if (rightDone)
  325. {
  326. observer.OnCompleted();
  327. return;
  328. }
  329. },
  330. observer.OnError,
  331. () =>
  332. {
  333. leftDone = true;
  334. if (rightDone)
  335. {
  336. observer.OnCompleted();
  337. return;
  338. }
  339. }
  340. );
  341. rightSubscription.Disposable = second.Synchronize(gate).Subscribe(
  342. r =>
  343. {
  344. hasRight = true;
  345. right = r;
  346. if (hasLeft)
  347. {
  348. var res = default(TResult);
  349. try
  350. {
  351. res = resultSelector(left, right);
  352. }
  353. catch (Exception ex)
  354. {
  355. observer.OnError(ex);
  356. return;
  357. }
  358. observer.OnNext(res);
  359. }
  360. else if (leftDone)
  361. {
  362. observer.OnCompleted();
  363. return;
  364. }
  365. },
  366. observer.OnError,
  367. () =>
  368. {
  369. rightDone = true;
  370. if (leftDone)
  371. {
  372. observer.OnCompleted();
  373. return;
  374. }
  375. }
  376. );
  377. return new CompositeDisposable(leftSubscription, rightSubscription);
  378. });
  379. #endif
  380. }
  381. #if !NO_PERF
  382. /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */
  383. #region CombineLatest auto-generated code (6/10/2012 7:25:03 PM)
  384. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, Func<TSource1, TSource2, TSource3, TResult> resultSelector)
  385. {
  386. return new CombineLatest<TSource1, TSource2, TSource3, TResult>(source1, source2, source3, resultSelector);
  387. }
  388. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, Func<TSource1, TSource2, TSource3, TSource4, TResult> resultSelector)
  389. {
  390. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TResult>(source1, source2, source3, source4, resultSelector);
  391. }
  392. #if !NO_LARGEARITY
  393. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TResult> resultSelector)
  394. {
  395. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(source1, source2, source3, source4, source5, resultSelector);
  396. }
  397. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult> resultSelector)
  398. {
  399. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(source1, source2, source3, source4, source5, source6, resultSelector);
  400. }
  401. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult> resultSelector)
  402. {
  403. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(source1, source2, source3, source4, source5, source6, source7, resultSelector);
  404. }
  405. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> resultSelector)
  406. {
  407. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector);
  408. }
  409. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult> resultSelector)
  410. {
  411. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector);
  412. }
  413. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult> resultSelector)
  414. {
  415. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector);
  416. }
  417. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult> resultSelector)
  418. {
  419. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector);
  420. }
  421. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult> resultSelector)
  422. {
  423. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector);
  424. }
  425. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult> resultSelector)
  426. {
  427. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector);
  428. }
  429. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult> resultSelector)
  430. {
  431. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector);
  432. }
  433. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult> resultSelector)
  434. {
  435. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector);
  436. }
  437. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, IObservable<TSource16> source16, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult> resultSelector)
  438. {
  439. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector);
  440. }
  441. #endif
  442. #endregion
  443. #endif
  444. public virtual IObservable<TResult> CombineLatest<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  445. {
  446. return CombineLatest_<TSource, TResult>(sources, resultSelector);
  447. }
  448. public virtual IObservable<IList<TSource>> CombineLatest<TSource>(IEnumerable<IObservable<TSource>> sources)
  449. {
  450. return CombineLatest_<TSource, IList<TSource>>(sources, res => res.ToList());
  451. }
  452. public virtual IObservable<IList<TSource>> CombineLatest<TSource>(params IObservable<TSource>[] sources)
  453. {
  454. return CombineLatest_<TSource, IList<TSource>>(sources, res => res.ToList());
  455. }
  456. private static IObservable<TResult> CombineLatest_<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  457. {
  458. #if !NO_PERF
  459. return new CombineLatest<TSource, TResult>(sources, resultSelector);
  460. #else
  461. return new AnonymousObservable<TResult>(observer =>
  462. {
  463. var srcs = sources.ToArray();
  464. var N = srcs.Length;
  465. var hasValue = new bool[N];
  466. var hasValueAll = false;
  467. var values = new List<TSource>(N);
  468. for (int i = 0; i < N; i++)
  469. values.Add(default(TSource));
  470. var isDone = new bool[N];
  471. var next = new Action<int>(i =>
  472. {
  473. hasValue[i] = true;
  474. if (hasValueAll || (hasValueAll = hasValue.All(Stubs<bool>.I)))
  475. {
  476. var res = default(TResult);
  477. try
  478. {
  479. res = resultSelector(new ReadOnlyCollection<TSource>(values));
  480. }
  481. catch (Exception ex)
  482. {
  483. observer.OnError(ex);
  484. return;
  485. }
  486. observer.OnNext(res);
  487. }
  488. else if (isDone.Where((x, j) => j != i).All(Stubs<bool>.I))
  489. {
  490. observer.OnCompleted();
  491. return;
  492. }
  493. });
  494. var done = new Action<int>(i =>
  495. {
  496. isDone[i] = true;
  497. if (isDone.All(Stubs<bool>.I))
  498. {
  499. observer.OnCompleted();
  500. return;
  501. }
  502. });
  503. var subscriptions = new SingleAssignmentDisposable[N];
  504. var gate = new object();
  505. for (int i = 0; i < N; i++)
  506. {
  507. var j = i;
  508. subscriptions[j] = new SingleAssignmentDisposable
  509. {
  510. Disposable = srcs[j].Synchronize(gate).Subscribe(
  511. x =>
  512. {
  513. values[j] = x;
  514. next(j);
  515. },
  516. observer.OnError,
  517. () =>
  518. {
  519. done(j);
  520. }
  521. )
  522. };
  523. }
  524. return new CompositeDisposable(subscriptions);
  525. });
  526. #endif
  527. }
  528. #endregion
  529. #region + Concat +
  530. public virtual IObservable<TSource> Concat<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  531. {
  532. return Concat_<TSource>(new[] { first, second });
  533. }
  534. public virtual IObservable<TSource> Concat<TSource>(params IObservable<TSource>[] sources)
  535. {
  536. return Concat_<TSource>(sources);
  537. }
  538. public virtual IObservable<TSource> Concat<TSource>(IEnumerable<IObservable<TSource>> sources)
  539. {
  540. return Concat_<TSource>(sources);
  541. }
  542. private static IObservable<TSource> Concat_<TSource>(IEnumerable<IObservable<TSource>> sources)
  543. {
  544. #if !NO_PERF
  545. return new Concat<TSource>(sources);
  546. #else
  547. return new AnonymousObservable<TSource>(observer =>
  548. {
  549. var isDisposed = false;
  550. var e = sources.GetEnumerator();
  551. var subscription = new SerialDisposable();
  552. var gate = new AsyncLock();
  553. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>
  554. {
  555. var current = default(IObservable<TSource>);
  556. var hasNext = false;
  557. var ex = default(Exception);
  558. if (!isDisposed)
  559. {
  560. try
  561. {
  562. hasNext = e.MoveNext();
  563. if (hasNext)
  564. current = e.Current;
  565. else
  566. e.Dispose();
  567. }
  568. catch (Exception exception)
  569. {
  570. ex = exception;
  571. e.Dispose();
  572. }
  573. }
  574. else
  575. return;
  576. if (ex != null)
  577. {
  578. observer.OnError(ex);
  579. return;
  580. }
  581. if (!hasNext)
  582. {
  583. observer.OnCompleted();
  584. return;
  585. }
  586. var d = new SingleAssignmentDisposable();
  587. subscription.Disposable = d;
  588. d.Disposable = current.Subscribe(observer.OnNext, observer.OnError, self);
  589. }));
  590. return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>
  591. {
  592. e.Dispose();
  593. isDisposed = true;
  594. })));
  595. });
  596. #endif
  597. }
  598. public virtual IObservable<TSource> Concat<TSource>(IObservable<IObservable<TSource>> sources)
  599. {
  600. return Concat_<TSource>(sources);
  601. }
  602. #if !NO_TPL
  603. public virtual IObservable<TSource> Concat<TSource>(IObservable<Task<TSource>> sources)
  604. {
  605. return Concat_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));
  606. }
  607. #endif
  608. private IObservable<TSource> Concat_<TSource>(IObservable<IObservable<TSource>> sources)
  609. {
  610. return Merge(sources, 1);
  611. }
  612. #endregion
  613. #region + Merge +
  614. public virtual IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> sources)
  615. {
  616. return Merge_<TSource>(sources);
  617. }
  618. #if !NO_TPL
  619. public virtual IObservable<TSource> Merge<TSource>(IObservable<Task<TSource>> sources)
  620. {
  621. return Merge_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));
  622. }
  623. #endif
  624. public virtual IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> sources, int maxConcurrent)
  625. {
  626. return Merge_<TSource>(sources, maxConcurrent);
  627. }
  628. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, int maxConcurrent)
  629. {
  630. return Merge_<TSource>(sources.ToObservable(), maxConcurrent);
  631. }
  632. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, int maxConcurrent, IScheduler scheduler)
  633. {
  634. return Merge_<TSource>(sources.ToObservable(scheduler), maxConcurrent);
  635. }
  636. public virtual IObservable<TSource> Merge<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  637. {
  638. return Merge_<TSource>(new[] { first, second }.ToObservable());
  639. }
  640. public virtual IObservable<TSource> Merge<TSource>(IObservable<TSource> first, IObservable<TSource> second, IScheduler scheduler)
  641. {
  642. return Merge_<TSource>(new[] { first, second }.ToObservable(scheduler));
  643. }
  644. public virtual IObservable<TSource> Merge<TSource>(params IObservable<TSource>[] sources)
  645. {
  646. return Merge_<TSource>(sources.ToObservable());
  647. }
  648. public virtual IObservable<TSource> Merge<TSource>(IScheduler scheduler, params IObservable<TSource>[] sources)
  649. {
  650. return Merge_<TSource>(sources.ToObservable(scheduler));
  651. }
  652. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources)
  653. {
  654. return Merge_<TSource>(sources.ToObservable());
  655. }
  656. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, IScheduler scheduler)
  657. {
  658. return Merge_<TSource>(sources.ToObservable(scheduler));
  659. }
  660. private static IObservable<TSource> Merge_<TSource>(IObservable<IObservable<TSource>> sources)
  661. {
  662. #if !NO_PERF
  663. return new Merge<TSource>(sources);
  664. #else
  665. return new AnonymousObservable<TSource>(observer =>
  666. {
  667. var gate = new object();
  668. var isStopped = false;
  669. var m = new SingleAssignmentDisposable();
  670. var group = new CompositeDisposable() { m };
  671. m.Disposable = sources.Subscribe(
  672. innerSource =>
  673. {
  674. var innerSubscription = new SingleAssignmentDisposable();
  675. group.Add(innerSubscription);
  676. innerSubscription.Disposable = innerSource.Subscribe(
  677. x =>
  678. {
  679. lock (gate)
  680. observer.OnNext(x);
  681. },
  682. exception =>
  683. {
  684. lock (gate)
  685. observer.OnError(exception);
  686. },
  687. () =>
  688. {
  689. group.Remove(innerSubscription); // modification MUST occur before subsequent check
  690. if (isStopped && group.Count == 1) // isStopped must be checked before group Count to ensure outer is not creating more groups
  691. lock (gate)
  692. observer.OnCompleted();
  693. });
  694. },
  695. exception =>
  696. {
  697. lock (gate)
  698. observer.OnError(exception);
  699. },
  700. () =>
  701. {
  702. isStopped = true; // modification MUST occur before subsequent check
  703. if (group.Count == 1)
  704. lock (gate)
  705. observer.OnCompleted();
  706. });
  707. return group;
  708. });
  709. #endif
  710. }
  711. private static IObservable<TSource> Merge_<TSource>(IObservable<IObservable<TSource>> sources, int maxConcurrent)
  712. {
  713. #if !NO_PERF
  714. return new Merge<TSource>(sources, maxConcurrent);
  715. #else
  716. return new AnonymousObservable<TSource>(observer =>
  717. {
  718. var gate = new object();
  719. var q = new Queue<IObservable<TSource>>();
  720. var isStopped = false;
  721. var group = new CompositeDisposable();
  722. var activeCount = 0;
  723. var subscribe = default(Action<IObservable<TSource>>);
  724. subscribe = xs =>
  725. {
  726. var subscription = new SingleAssignmentDisposable();
  727. group.Add(subscription);
  728. subscription.Disposable = xs.Subscribe(
  729. x =>
  730. {
  731. lock (gate)
  732. observer.OnNext(x);
  733. },
  734. exception =>
  735. {
  736. lock (gate)
  737. observer.OnError(exception);
  738. },
  739. () =>
  740. {
  741. group.Remove(subscription);
  742. lock (gate)
  743. {
  744. if (q.Count > 0)
  745. {
  746. var s = q.Dequeue();
  747. subscribe(s);
  748. }
  749. else
  750. {
  751. activeCount--;
  752. if (isStopped && activeCount == 0)
  753. observer.OnCompleted();
  754. }
  755. }
  756. });
  757. };
  758. group.Add(sources.Subscribe(
  759. innerSource =>
  760. {
  761. lock (gate)
  762. {
  763. if (activeCount < maxConcurrent)
  764. {
  765. activeCount++;
  766. subscribe(innerSource);
  767. }
  768. else
  769. q.Enqueue(innerSource);
  770. }
  771. },
  772. exception =>
  773. {
  774. lock (gate)
  775. observer.OnError(exception);
  776. },
  777. () =>
  778. {
  779. lock (gate)
  780. {
  781. isStopped = true;
  782. if (activeCount == 0)
  783. observer.OnCompleted();
  784. }
  785. }));
  786. return group;
  787. });
  788. #endif
  789. }
  790. #endregion
  791. #region + OnErrorResumeNext +
  792. public virtual IObservable<TSource> OnErrorResumeNext<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  793. {
  794. return OnErrorResumeNext_<TSource>(new[] { first, second });
  795. }
  796. public virtual IObservable<TSource> OnErrorResumeNext<TSource>(params IObservable<TSource>[] sources)
  797. {
  798. return OnErrorResumeNext_<TSource>(sources);
  799. }
  800. public virtual IObservable<TSource> OnErrorResumeNext<TSource>(IEnumerable<IObservable<TSource>> sources)
  801. {
  802. return OnErrorResumeNext_<TSource>(sources);
  803. }
  804. private static IObservable<TSource> OnErrorResumeNext_<TSource>(IEnumerable<IObservable<TSource>> sources)
  805. {
  806. #if !NO_PERF
  807. return new OnErrorResumeNext<TSource>(sources);
  808. #else
  809. return new AnonymousObservable<TSource>(observer =>
  810. {
  811. var gate = new AsyncLock();
  812. var isDisposed = false;
  813. var e = sources.GetEnumerator();
  814. var subscription = new SerialDisposable();
  815. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>
  816. {
  817. var current = default(IObservable<TSource>);
  818. var hasNext = false;
  819. var ex = default(Exception);
  820. if (!isDisposed)
  821. {
  822. try
  823. {
  824. hasNext = e.MoveNext();
  825. if (hasNext)
  826. current = e.Current;
  827. else
  828. e.Dispose();
  829. }
  830. catch (Exception exception)
  831. {
  832. ex = exception;
  833. e.Dispose();
  834. }
  835. }
  836. else
  837. return;
  838. if (ex != null)
  839. {
  840. observer.OnError(ex);
  841. return;
  842. }
  843. if (!hasNext)
  844. {
  845. observer.OnCompleted();
  846. return;
  847. }
  848. var d = new SingleAssignmentDisposable();
  849. subscription.Disposable = d;
  850. d.Disposable = current.Subscribe(observer.OnNext, exception => self(), self);
  851. }));
  852. return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>
  853. {
  854. e.Dispose();
  855. isDisposed = true;
  856. })));
  857. });
  858. #endif
  859. }
  860. #endregion
  861. #region + SkipUntil +
  862. public virtual IObservable<TSource> SkipUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)
  863. {
  864. #if !NO_PERF
  865. return new SkipUntil<TSource, TOther>(source, other);
  866. #else
  867. return new AnonymousObservable<TSource>(observer =>
  868. {
  869. var sourceSubscription = new SingleAssignmentDisposable();
  870. var otherSubscription = new SingleAssignmentDisposable();
  871. var open = false;
  872. var gate = new object();
  873. sourceSubscription.Disposable = source.Synchronize(gate).Subscribe(
  874. x =>
  875. {
  876. if (open)
  877. observer.OnNext(x);
  878. },
  879. observer.OnError, // BREAKING CHANGE - Error propagation was guarded by "other" source in v1.0.10621 (due to materialization).
  880. () =>
  881. {
  882. if (open)
  883. observer.OnCompleted();
  884. }
  885. );
  886. otherSubscription.Disposable = other.Synchronize(gate).Subscribe(
  887. x =>
  888. {
  889. open = true;
  890. otherSubscription.Dispose();
  891. },
  892. observer.OnError
  893. );
  894. return new CompositeDisposable(sourceSubscription, otherSubscription);
  895. });
  896. #endif
  897. }
  898. #endregion
  899. #region + Switch +
  900. public virtual IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources)
  901. {
  902. return Switch_<TSource>(sources);
  903. }
  904. #if !NO_TPL
  905. public virtual IObservable<TSource> Switch<TSource>(IObservable<Task<TSource>> sources)
  906. {
  907. return Switch_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));
  908. }
  909. #endif
  910. private IObservable<TSource> Switch_<TSource>(IObservable<IObservable<TSource>> sources)
  911. {
  912. #if !NO_PERF
  913. return new Switch<TSource>(sources);
  914. #else
  915. return new AnonymousObservable<TSource>(observer =>
  916. {
  917. var gate = new object();
  918. var innerSubscription = new SerialDisposable();
  919. var isStopped = false;
  920. var latest = 0UL;
  921. var hasLatest = false;
  922. var subscription = sources.Subscribe(
  923. innerSource =>
  924. {
  925. var id = default(ulong);
  926. lock (gate)
  927. {
  928. id = unchecked(++latest);
  929. hasLatest = true;
  930. }
  931. var d = new SingleAssignmentDisposable();
  932. innerSubscription.Disposable = d;
  933. d.Disposable = innerSource.Subscribe(
  934. x =>
  935. {
  936. lock (gate)
  937. {
  938. if (latest == id)
  939. observer.OnNext(x);
  940. }
  941. },
  942. exception =>
  943. {
  944. lock (gate)
  945. {
  946. if (latest == id)
  947. observer.OnError(exception);
  948. }
  949. },
  950. () =>
  951. {
  952. lock (gate)
  953. {
  954. if (latest == id)
  955. {
  956. hasLatest = false;
  957. if (isStopped)
  958. observer.OnCompleted();
  959. }
  960. }
  961. });
  962. },
  963. exception =>
  964. {
  965. lock (gate)
  966. observer.OnError(exception);
  967. },
  968. () =>
  969. {
  970. lock (gate)
  971. {
  972. isStopped = true;
  973. if (!hasLatest)
  974. observer.OnCompleted();
  975. }
  976. });
  977. return new CompositeDisposable(subscription, innerSubscription);
  978. });
  979. #endif
  980. }
  981. #endregion
  982. #region + TakeUntil +
  983. public virtual IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)
  984. {
  985. #if !NO_PERF
  986. return new TakeUntil<TSource, TOther>(source, other);
  987. #else
  988. return new AnonymousObservable<TSource>(observer =>
  989. {
  990. var sourceSubscription = new SingleAssignmentDisposable();
  991. var otherSubscription = new SingleAssignmentDisposable();
  992. var gate = new object();
  993. // COMPAT - Order of Subscribe calls per v1.0.10621
  994. otherSubscription.Disposable = other.Synchronize(gate).Subscribe(
  995. x =>
  996. {
  997. observer.OnCompleted();
  998. },
  999. observer.OnError
  1000. );
  1001. sourceSubscription.Disposable = source.Synchronize(gate).Finally(otherSubscription.Dispose).Subscribe(observer);
  1002. return new CompositeDisposable(sourceSubscription, otherSubscription);
  1003. });
  1004. #endif
  1005. }
  1006. #endregion
  1007. #region + Window +
  1008. public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector)
  1009. {
  1010. #if !NO_PERF
  1011. return new Window<TSource, TWindowClosing>(source, windowClosingSelector);
  1012. #else
  1013. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1014. {
  1015. var window = new Subject<TSource>();
  1016. var gate = new object();
  1017. var m = new SerialDisposable();
  1018. var d = new CompositeDisposable(2) { m };
  1019. var r = new RefCountDisposable(d);
  1020. observer.OnNext(window.AddRef(r));
  1021. d.Add(source.SubscribeSafe(new AnonymousObserver<TSource>(
  1022. x =>
  1023. {
  1024. lock (gate)
  1025. {
  1026. window.OnNext(x);
  1027. }
  1028. },
  1029. ex =>
  1030. {
  1031. lock (gate)
  1032. {
  1033. window.OnError(ex);
  1034. observer.OnError(ex);
  1035. }
  1036. },
  1037. () =>
  1038. {
  1039. lock (gate)
  1040. {
  1041. window.OnCompleted();
  1042. observer.OnCompleted();
  1043. }
  1044. })));
  1045. var l = new AsyncLock();
  1046. Action createWindowClose = null;
  1047. createWindowClose = () =>
  1048. {
  1049. var windowClose = default(IObservable<TWindowClosing>);
  1050. try
  1051. {
  1052. windowClose = windowClosingSelector();
  1053. }
  1054. catch (Exception exception)
  1055. {
  1056. lock (gate)
  1057. {
  1058. observer.OnError(exception);
  1059. }
  1060. return;
  1061. }
  1062. var m1 = new SingleAssignmentDisposable();
  1063. m.Disposable = m1;
  1064. m1.Disposable = windowClose.Take(1).SubscribeSafe(new AnonymousObserver<TWindowClosing>(
  1065. Stubs<TWindowClosing>.Ignore,
  1066. ex =>
  1067. {
  1068. lock (gate)
  1069. {
  1070. window.OnError(ex);
  1071. observer.OnError(ex);
  1072. }
  1073. },
  1074. () =>
  1075. {
  1076. lock (gate)
  1077. {
  1078. window.OnCompleted();
  1079. window = new Subject<TSource>();
  1080. observer.OnNext(window.AddRef(r));
  1081. }
  1082. l.Wait(createWindowClose);
  1083. }));
  1084. };
  1085. l.Wait(createWindowClose);
  1086. return r;
  1087. });
  1088. #endif
  1089. }
  1090. public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector)
  1091. {
  1092. return windowOpenings.GroupJoin(source, windowClosingSelector, _ => Observable.Empty<Unit>(), (_, window) => window);
  1093. }
  1094. public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries)
  1095. {
  1096. #if !NO_PERF
  1097. return new Window<TSource, TWindowBoundary>(source, windowBoundaries);
  1098. #else
  1099. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1100. {
  1101. var window = new Subject<TSource>();
  1102. var gate = new object();
  1103. var d = new CompositeDisposable(2);
  1104. var r = new RefCountDisposable(d);
  1105. observer.OnNext(window.AddRef(r));
  1106. d.Add(source.SubscribeSafe(new AnonymousObserver<TSource>(
  1107. x =>
  1108. {
  1109. lock (gate)
  1110. {
  1111. window.OnNext(x);
  1112. }
  1113. },
  1114. ex =>
  1115. {
  1116. lock (gate)
  1117. {
  1118. window.OnError(ex);
  1119. observer.OnError(ex);
  1120. }
  1121. },
  1122. () =>
  1123. {
  1124. lock (gate)
  1125. {
  1126. window.OnCompleted();
  1127. observer.OnCompleted();
  1128. }
  1129. }
  1130. )));
  1131. d.Add(windowBoundaries.SubscribeSafe(new AnonymousObserver<TWindowBoundary>(
  1132. w =>
  1133. {
  1134. lock (gate)
  1135. {
  1136. window.OnCompleted();
  1137. window = new Subject<TSource>();
  1138. observer.OnNext(window.AddRef(r));
  1139. }
  1140. },
  1141. ex =>
  1142. {
  1143. lock (gate)
  1144. {
  1145. window.OnError(ex);
  1146. observer.OnError(ex);
  1147. }
  1148. },
  1149. () =>
  1150. {
  1151. lock (gate)
  1152. {
  1153. window.OnCompleted();
  1154. observer.OnCompleted();
  1155. }
  1156. }
  1157. )));
  1158. return r;
  1159. });
  1160. #endif
  1161. }
  1162. #endregion
  1163. #region + Zip +
  1164. public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  1165. {
  1166. #if !NO_PERF
  1167. return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);
  1168. #else
  1169. return new AnonymousObservable<TResult>(observer =>
  1170. {
  1171. var queueLeft = new Queue<TFirst>();
  1172. var queueRight = new Queue<TSecond>();
  1173. var leftDone = false;
  1174. var rightDone = false;
  1175. var leftSubscription = new SingleAssignmentDisposable();
  1176. var rightSubscription = new SingleAssignmentDisposable();
  1177. var gate = new object();
  1178. leftSubscription.Disposable = first.Synchronize(gate).Subscribe(
  1179. l =>
  1180. {
  1181. if (queueRight.Count > 0)
  1182. {
  1183. var r = queueRight.Dequeue();
  1184. var res = default(TResult);
  1185. try
  1186. {
  1187. res = resultSelector(l, r);
  1188. }
  1189. catch (Exception ex)
  1190. {
  1191. observer.OnError(ex);
  1192. return;
  1193. }
  1194. observer.OnNext(res);
  1195. }
  1196. else
  1197. {
  1198. if (rightDone)
  1199. {
  1200. observer.OnCompleted();
  1201. return;
  1202. }
  1203. queueLeft.Enqueue(l);
  1204. }
  1205. },
  1206. observer.OnError,
  1207. () =>
  1208. {
  1209. leftDone = true;
  1210. if (rightDone)
  1211. {
  1212. observer.OnCompleted();
  1213. return;
  1214. }
  1215. }
  1216. );
  1217. rightSubscription.Disposable = second.Synchronize(gate).Subscribe(
  1218. r =>
  1219. {
  1220. if (queueLeft.Count > 0)
  1221. {
  1222. var l = queueLeft.Dequeue();
  1223. var res = default(TResult);
  1224. try
  1225. {
  1226. res = resultSelector(l, r);
  1227. }
  1228. catch (Exception ex)
  1229. {
  1230. observer.OnError(ex);
  1231. return;
  1232. }
  1233. observer.OnNext(res);
  1234. }
  1235. else
  1236. {
  1237. if (leftDone)
  1238. {
  1239. observer.OnCompleted();
  1240. return;
  1241. }
  1242. queueRight.Enqueue(r);
  1243. }
  1244. },
  1245. observer.OnError,
  1246. () =>
  1247. {
  1248. rightDone = true;
  1249. if (leftDone)
  1250. {
  1251. observer.OnCompleted();
  1252. return;
  1253. }
  1254. }
  1255. );
  1256. return new CompositeDisposable(leftSubscription, rightSubscription, Disposable.Create(() => { queueLeft.Clear(); queueRight.Clear(); }));
  1257. });
  1258. #endif
  1259. }
  1260. public virtual IObservable<TResult> Zip<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  1261. {
  1262. return Zip_<TSource>(sources).Select(resultSelector);
  1263. }
  1264. public virtual IObservable<IList<TSource>> Zip<TSource>(IEnumerable<IObservable<TSource>> sources)
  1265. {
  1266. return Zip_<TSource>(sources);
  1267. }
  1268. public virtual IObservable<IList<TSource>> Zip<TSource>(params IObservable<TSource>[] sources)
  1269. {
  1270. return Zip_<TSource>(sources);
  1271. }
  1272. private static IObservable<IList<TSource>> Zip_<TSource>(IEnumerable<IObservable<TSource>> sources)
  1273. {
  1274. #if !NO_PERF
  1275. return new Zip<TSource>(sources);
  1276. #else
  1277. return new AnonymousObservable<IList<TSource>>(observer =>
  1278. {
  1279. var srcs = sources.ToArray();
  1280. var N = srcs.Length;
  1281. var queues = new Queue<TSource>[N];
  1282. for (int i = 0; i < N; i++)
  1283. queues[i] = new Queue<TSource>();
  1284. var isDone = new bool[N];
  1285. var next = new Action<int>(i =>
  1286. {
  1287. if (queues.All(q => q.Count > 0))
  1288. {
  1289. var res = queues.Select(q => q.Dequeue()).ToList();
  1290. observer.OnNext(res);
  1291. }
  1292. else if (isDone.Where((x, j) => j != i).All(Stubs<bool>.I))
  1293. {
  1294. observer.OnCompleted();
  1295. return;
  1296. }
  1297. });
  1298. var done = new Action<int>(i =>
  1299. {
  1300. isDone[i] = true;
  1301. if (isDone.All(Stubs<bool>.I))
  1302. {
  1303. observer.OnCompleted();
  1304. return;
  1305. }
  1306. });
  1307. var subscriptions = new SingleAssignmentDisposable[N];
  1308. var gate = new object();
  1309. for (int i = 0; i < N; i++)
  1310. {
  1311. var j = i;
  1312. subscriptions[j] = new SingleAssignmentDisposable
  1313. {
  1314. Disposable = srcs[j].Synchronize(gate).Subscribe(
  1315. x =>
  1316. {
  1317. queues[j].Enqueue(x);
  1318. next(j);
  1319. },
  1320. observer.OnError,
  1321. () =>
  1322. {
  1323. done(j);
  1324. }
  1325. )
  1326. };
  1327. }
  1328. return new CompositeDisposable(subscriptions) { Disposable.Create(() => { foreach (var q in queues) q.Clear(); }) };
  1329. });
  1330. #endif
  1331. }
  1332. #if !NO_PERF
  1333. /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */
  1334. #region Zip auto-generated code (6/10/2012 8:15:28 PM)
  1335. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, Func<TSource1, TSource2, TSource3, TResult> resultSelector)
  1336. {
  1337. return new Zip<TSource1, TSource2, TSource3, TResult>(source1, source2, source3, resultSelector);
  1338. }
  1339. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, Func<TSource1, TSource2, TSource3, TSource4, TResult> resultSelector)
  1340. {
  1341. return new Zip<TSource1, TSource2, TSource3, TSource4, TResult>(source1, source2, source3, source4, resultSelector);
  1342. }
  1343. #if !NO_LARGEARITY
  1344. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TResult> resultSelector)
  1345. {
  1346. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(source1, source2, source3, source4, source5, resultSelector);
  1347. }
  1348. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult> resultSelector)
  1349. {
  1350. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(source1, source2, source3, source4, source5, source6, resultSelector);
  1351. }
  1352. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult> resultSelector)
  1353. {
  1354. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(source1, source2, source3, source4, source5, source6, source7, resultSelector);
  1355. }
  1356. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> resultSelector)
  1357. {
  1358. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector);
  1359. }
  1360. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult> resultSelector)
  1361. {
  1362. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector);
  1363. }
  1364. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult> resultSelector)
  1365. {
  1366. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector);
  1367. }
  1368. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult> resultSelector)
  1369. {
  1370. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector);
  1371. }
  1372. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult> resultSelector)
  1373. {
  1374. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector);
  1375. }
  1376. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult> resultSelector)
  1377. {
  1378. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector);
  1379. }
  1380. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult> resultSelector)
  1381. {
  1382. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector);
  1383. }
  1384. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult> resultSelector)
  1385. {
  1386. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector);
  1387. }
  1388. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, IObservable<TSource16> source16, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult> resultSelector)
  1389. {
  1390. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector);
  1391. }
  1392. #endif
  1393. #endregion
  1394. #endif
  1395. public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  1396. {
  1397. #if !NO_PERF
  1398. return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);
  1399. #else
  1400. return new AnonymousObservable<TResult>(observer =>
  1401. {
  1402. var rightEnumerator = second.GetEnumerator();
  1403. var leftSubscription = first.Subscribe(left =>
  1404. {
  1405. var hasNext = false;
  1406. try
  1407. {
  1408. hasNext = rightEnumerator.MoveNext();
  1409. }
  1410. catch (Exception ex)
  1411. {
  1412. observer.OnError(ex);
  1413. return;
  1414. }
  1415. if (hasNext)
  1416. {
  1417. var right = default(TSecond);
  1418. try
  1419. {
  1420. right = rightEnumerator.Current;
  1421. }
  1422. catch (Exception ex)
  1423. {
  1424. observer.OnError(ex);
  1425. return;
  1426. }
  1427. TResult result;
  1428. try
  1429. {
  1430. result = resultSelector(left, right);
  1431. }
  1432. catch (Exception ex)
  1433. {
  1434. observer.OnError(ex);
  1435. return;
  1436. }
  1437. observer.OnNext(result);
  1438. }
  1439. else
  1440. {
  1441. observer.OnCompleted();
  1442. }
  1443. },
  1444. observer.OnError,
  1445. observer.OnCompleted
  1446. );
  1447. return new CompositeDisposable(leftSubscription, rightEnumerator);
  1448. });
  1449. #endif
  1450. }
  1451. #endregion
  1452. #region |> Helpers <|
  1453. #if NO_PERF
  1454. private static IObservable<TResult> Combine<TLeft, TRight, TResult>(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)
  1455. {
  1456. return new AnonymousObservable<TResult>(observer =>
  1457. {
  1458. var leftSubscription = new SingleAssignmentDisposable();
  1459. var rightSubscription = new SingleAssignmentDisposable();
  1460. var combiner = combinerSelector(observer, leftSubscription, rightSubscription);
  1461. var gate = new object();
  1462. leftSubscription.Disposable = leftSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner);
  1463. rightSubscription.Disposable = rightSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x)).Synchronize(gate).Subscribe(combiner);
  1464. return new CompositeDisposable(leftSubscription, rightSubscription);
  1465. });
  1466. }
  1467. #endif
  1468. #endregion
  1469. }
  1470. }