1
0

QueryLanguage.Multiple.cs 78 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Collections.ObjectModel;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Subjects;
  9. #if !NO_TPL
  10. using System.Reactive.Threading.Tasks;
  11. using System.Threading.Tasks;
  12. #endif
  13. namespace System.Reactive.Linq
  14. {
  15. #if !NO_PERF
  16. using ObservableImpl;
  17. #endif
  18. internal partial class QueryLanguage
  19. {
  20. #region + Amb +
  21. public virtual IObservable<TSource> Amb<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  22. {
  23. return Amb_(first, second);
  24. }
  25. public virtual IObservable<TSource> Amb<TSource>(params IObservable<TSource>[] sources)
  26. {
  27. return Amb_(sources);
  28. }
  29. public virtual IObservable<TSource> Amb<TSource>(IEnumerable<IObservable<TSource>> sources)
  30. {
  31. return Amb_(sources);
  32. }
  33. private static IObservable<TSource> Amb_<TSource>(IEnumerable<IObservable<TSource>> sources)
  34. {
  35. return sources.Aggregate(Observable.Never<TSource>(), (previous, current) => previous.Amb(current));
  36. }
  37. private static IObservable<TSource> Amb_<TSource>(IObservable<TSource> leftSource, IObservable<TSource> rightSource)
  38. {
  39. #if !NO_PERF
  40. return new Amb<TSource>(leftSource, rightSource);
  41. #else
  42. return new AnonymousObservable<TSource>(observer =>
  43. {
  44. var leftSubscription = new SingleAssignmentDisposable();
  45. var rightSubscription = new SingleAssignmentDisposable();
  46. var choice = AmbState.Neither;
  47. var gate = new object();
  48. var left = new AmbObserver<TSource>();
  49. var right = new AmbObserver<TSource>();
  50. left.Observer = Observer.Synchronize(Observer.Create<TSource>(
  51. x =>
  52. {
  53. if (choice == AmbState.Neither)
  54. {
  55. choice = AmbState.Left;
  56. rightSubscription.Dispose();
  57. left.Observer = observer;
  58. }
  59. if (choice == AmbState.Left)
  60. observer.OnNext(x);
  61. },
  62. ex =>
  63. {
  64. if (choice == AmbState.Neither)
  65. {
  66. choice = AmbState.Left;
  67. rightSubscription.Dispose();
  68. left.Observer = observer;
  69. }
  70. if (choice == AmbState.Left)
  71. observer.OnError(ex);
  72. },
  73. () =>
  74. {
  75. if (choice == AmbState.Neither)
  76. {
  77. choice = AmbState.Left;
  78. rightSubscription.Dispose();
  79. left.Observer = observer;
  80. }
  81. if (choice == AmbState.Left)
  82. observer.OnCompleted();
  83. }
  84. ), gate);
  85. right.Observer = Observer.Synchronize(Observer.Create<TSource>(
  86. x =>
  87. {
  88. if (choice == AmbState.Neither)
  89. {
  90. choice = AmbState.Right;
  91. leftSubscription.Dispose();
  92. right.Observer = observer;
  93. }
  94. if (choice == AmbState.Right)
  95. observer.OnNext(x);
  96. },
  97. ex =>
  98. {
  99. if (choice == AmbState.Neither)
  100. {
  101. choice = AmbState.Right;
  102. leftSubscription.Dispose();
  103. right.Observer = observer;
  104. }
  105. if (choice == AmbState.Right)
  106. observer.OnError(ex);
  107. },
  108. () =>
  109. {
  110. if (choice == AmbState.Neither)
  111. {
  112. choice = AmbState.Right;
  113. leftSubscription.Dispose();
  114. right.Observer = observer;
  115. }
  116. if (choice == AmbState.Right)
  117. observer.OnCompleted();
  118. }
  119. ), gate);
  120. leftSubscription.Disposable = leftSource.Subscribe(left);
  121. rightSubscription.Disposable = rightSource.Subscribe(right);
  122. return new CompositeDisposable(leftSubscription, rightSubscription);
  123. });
  124. #endif
  125. }
  126. #if NO_PERF
  127. class AmbObserver<TSource> : IObserver<TSource>
  128. {
  129. public virtual IObserver<TSource> Observer { get; set; }
  130. public virtual void OnCompleted()
  131. {
  132. Observer.OnCompleted();
  133. }
  134. public virtual void OnError(Exception error)
  135. {
  136. Observer.OnError(error);
  137. }
  138. public virtual void OnNext(TSource value)
  139. {
  140. Observer.OnNext(value);
  141. }
  142. }
  143. enum AmbState
  144. {
  145. Left,
  146. Right,
  147. Neither
  148. }
  149. #endif
  150. #endregion
  151. #region + Buffer +
  152. public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferClosing>(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)
  153. {
  154. #if !NO_PERF
  155. return new Buffer<TSource, TBufferClosing>(source, bufferClosingSelector);
  156. #else
  157. return source.Window(bufferClosingSelector).SelectMany(ToList);
  158. #endif
  159. }
  160. public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferOpening, TBufferClosing>(IObservable<TSource> source, IObservable<TBufferOpening> bufferOpenings, Func<TBufferOpening, IObservable<TBufferClosing>> bufferClosingSelector)
  161. {
  162. return source.Window(bufferOpenings, bufferClosingSelector).SelectMany(ToList);
  163. }
  164. public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferBoundary>(IObservable<TSource> source, IObservable<TBufferBoundary> bufferBoundaries)
  165. {
  166. #if !NO_PERF
  167. return new Buffer<TSource, TBufferBoundary>(source, bufferBoundaries);
  168. #else
  169. return source.Window(bufferBoundaries).SelectMany(ToList);
  170. #endif
  171. }
  172. #endregion
  173. #region + Catch +
  174. public virtual IObservable<TSource> Catch<TSource, TException>(IObservable<TSource> source, Func<TException, IObservable<TSource>> handler) where TException : Exception
  175. {
  176. #if !NO_PERF
  177. return new Catch<TSource, TException>(source, handler);
  178. #else
  179. return new AnonymousObservable<TSource>(observer =>
  180. {
  181. var subscription = new SerialDisposable();
  182. var d1 = new SingleAssignmentDisposable();
  183. subscription.Disposable = d1;
  184. d1.Disposable = source.Subscribe(observer.OnNext,
  185. exception =>
  186. {
  187. var e = exception as TException;
  188. if (e != null)
  189. {
  190. IObservable<TSource> result;
  191. try
  192. {
  193. result = handler(e);
  194. }
  195. catch (Exception ex)
  196. {
  197. observer.OnError(ex);
  198. return;
  199. }
  200. var d = new SingleAssignmentDisposable();
  201. subscription.Disposable = d;
  202. d.Disposable = result.Subscribe(observer);
  203. }
  204. else
  205. observer.OnError(exception);
  206. }, observer.OnCompleted);
  207. return subscription;
  208. });
  209. #endif
  210. }
  211. public virtual IObservable<TSource> Catch<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  212. {
  213. return Catch_<TSource>(new[] { first, second });
  214. }
  215. public virtual IObservable<TSource> Catch<TSource>(params IObservable<TSource>[] sources)
  216. {
  217. return Catch_<TSource>(sources);
  218. }
  219. public virtual IObservable<TSource> Catch<TSource>(IEnumerable<IObservable<TSource>> sources)
  220. {
  221. return Catch_<TSource>(sources);
  222. }
  223. private static IObservable<TSource> Catch_<TSource>(IEnumerable<IObservable<TSource>> sources)
  224. {
  225. #if !NO_PERF
  226. return new Catch<TSource>(sources);
  227. #else
  228. return new AnonymousObservable<TSource>(observer =>
  229. {
  230. var gate = new AsyncLock();
  231. var isDisposed = false;
  232. var e = sources.GetEnumerator();
  233. var subscription = new SerialDisposable();
  234. var lastException = default(Exception);
  235. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>
  236. {
  237. var current = default(IObservable<TSource>);
  238. var hasNext = false;
  239. var ex = default(Exception);
  240. if (!isDisposed)
  241. {
  242. try
  243. {
  244. hasNext = e.MoveNext();
  245. if (hasNext)
  246. current = e.Current;
  247. else
  248. e.Dispose();
  249. }
  250. catch (Exception exception)
  251. {
  252. ex = exception;
  253. e.Dispose();
  254. }
  255. }
  256. else
  257. return;
  258. if (ex != null)
  259. {
  260. observer.OnError(ex);
  261. return;
  262. }
  263. if (!hasNext)
  264. {
  265. if (lastException != null)
  266. observer.OnError(lastException);
  267. else
  268. observer.OnCompleted();
  269. return;
  270. }
  271. var d = new SingleAssignmentDisposable();
  272. subscription.Disposable = d;
  273. d.Disposable = current.Subscribe(observer.OnNext, exception =>
  274. {
  275. lastException = exception;
  276. self();
  277. }, observer.OnCompleted);
  278. }));
  279. return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>
  280. {
  281. e.Dispose();
  282. isDisposed = true;
  283. })));
  284. });
  285. #endif
  286. }
  287. #endregion
  288. #region + CombineLatest +
  289. public virtual IObservable<TResult> CombineLatest<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  290. {
  291. #if !NO_PERF
  292. return new CombineLatest<TFirst, TSecond, TResult>(first, second, resultSelector);
  293. #else
  294. return new AnonymousObservable<TResult>(observer =>
  295. {
  296. var hasLeft = false;
  297. var hasRight = false;
  298. var left = default(TFirst);
  299. var right = default(TSecond);
  300. var leftDone = false;
  301. var rightDone = false;
  302. var leftSubscription = new SingleAssignmentDisposable();
  303. var rightSubscription = new SingleAssignmentDisposable();
  304. var gate = new object();
  305. leftSubscription.Disposable = first.Synchronize(gate).Subscribe(
  306. l =>
  307. {
  308. hasLeft = true;
  309. left = l;
  310. if (hasRight)
  311. {
  312. var res = default(TResult);
  313. try
  314. {
  315. res = resultSelector(left, right);
  316. }
  317. catch (Exception ex)
  318. {
  319. observer.OnError(ex);
  320. return;
  321. }
  322. observer.OnNext(res);
  323. }
  324. else if (rightDone)
  325. {
  326. observer.OnCompleted();
  327. return;
  328. }
  329. },
  330. observer.OnError,
  331. () =>
  332. {
  333. leftDone = true;
  334. if (rightDone)
  335. {
  336. observer.OnCompleted();
  337. return;
  338. }
  339. }
  340. );
  341. rightSubscription.Disposable = second.Synchronize(gate).Subscribe(
  342. r =>
  343. {
  344. hasRight = true;
  345. right = r;
  346. if (hasLeft)
  347. {
  348. var res = default(TResult);
  349. try
  350. {
  351. res = resultSelector(left, right);
  352. }
  353. catch (Exception ex)
  354. {
  355. observer.OnError(ex);
  356. return;
  357. }
  358. observer.OnNext(res);
  359. }
  360. else if (leftDone)
  361. {
  362. observer.OnCompleted();
  363. return;
  364. }
  365. },
  366. observer.OnError,
  367. () =>
  368. {
  369. rightDone = true;
  370. if (leftDone)
  371. {
  372. observer.OnCompleted();
  373. return;
  374. }
  375. }
  376. );
  377. return new CompositeDisposable(leftSubscription, rightSubscription);
  378. });
  379. #endif
  380. }
  381. #if !NO_PERF
  382. /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */
  383. #region CombineLatest auto-generated code (6/10/2012 7:25:03 PM)
  384. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, Func<TSource1, TSource2, TSource3, TResult> resultSelector)
  385. {
  386. return new CombineLatest<TSource1, TSource2, TSource3, TResult>(source1, source2, source3, resultSelector);
  387. }
  388. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, Func<TSource1, TSource2, TSource3, TSource4, TResult> resultSelector)
  389. {
  390. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TResult>(source1, source2, source3, source4, resultSelector);
  391. }
  392. #if !NO_LARGEARITY
  393. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TResult> resultSelector)
  394. {
  395. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(source1, source2, source3, source4, source5, resultSelector);
  396. }
  397. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult> resultSelector)
  398. {
  399. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(source1, source2, source3, source4, source5, source6, resultSelector);
  400. }
  401. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult> resultSelector)
  402. {
  403. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(source1, source2, source3, source4, source5, source6, source7, resultSelector);
  404. }
  405. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> resultSelector)
  406. {
  407. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector);
  408. }
  409. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult> resultSelector)
  410. {
  411. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector);
  412. }
  413. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult> resultSelector)
  414. {
  415. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector);
  416. }
  417. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult> resultSelector)
  418. {
  419. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector);
  420. }
  421. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult> resultSelector)
  422. {
  423. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector);
  424. }
  425. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult> resultSelector)
  426. {
  427. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector);
  428. }
  429. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult> resultSelector)
  430. {
  431. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector);
  432. }
  433. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult> resultSelector)
  434. {
  435. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector);
  436. }
  437. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, IObservable<TSource16> source16, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult> resultSelector)
  438. {
  439. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector);
  440. }
  441. #endif
  442. #endregion
  443. #endif
  444. public virtual IObservable<TResult> CombineLatest<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  445. {
  446. return CombineLatest_<TSource, TResult>(sources, resultSelector);
  447. }
  448. public virtual IObservable<IList<TSource>> CombineLatest<TSource>(IEnumerable<IObservable<TSource>> sources)
  449. {
  450. return CombineLatest_<TSource, IList<TSource>>(sources, res => res.ToList());
  451. }
  452. public virtual IObservable<IList<TSource>> CombineLatest<TSource>(params IObservable<TSource>[] sources)
  453. {
  454. return CombineLatest_<TSource, IList<TSource>>(sources, res => res.ToList());
  455. }
  456. private static IObservable<TResult> CombineLatest_<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  457. {
  458. #if !NO_PERF
  459. return new CombineLatest<TSource, TResult>(sources, resultSelector);
  460. #else
  461. return new AnonymousObservable<TResult>(observer =>
  462. {
  463. var srcs = sources.ToArray();
  464. var N = srcs.Length;
  465. var hasValue = new bool[N];
  466. var hasValueAll = false;
  467. var values = new List<TSource>(N);
  468. for (int i = 0; i < N; i++)
  469. values.Add(default(TSource));
  470. var isDone = new bool[N];
  471. var next = new Action<int>(i =>
  472. {
  473. hasValue[i] = true;
  474. if (hasValueAll || (hasValueAll = hasValue.All(Stubs<bool>.I)))
  475. {
  476. var res = default(TResult);
  477. try
  478. {
  479. res = resultSelector(new ReadOnlyCollection<TSource>(values));
  480. }
  481. catch (Exception ex)
  482. {
  483. observer.OnError(ex);
  484. return;
  485. }
  486. observer.OnNext(res);
  487. }
  488. else if (isDone.Where((x, j) => j != i).All(Stubs<bool>.I))
  489. {
  490. observer.OnCompleted();
  491. return;
  492. }
  493. });
  494. var done = new Action<int>(i =>
  495. {
  496. isDone[i] = true;
  497. if (isDone.All(Stubs<bool>.I))
  498. {
  499. observer.OnCompleted();
  500. return;
  501. }
  502. });
  503. var subscriptions = new SingleAssignmentDisposable[N];
  504. var gate = new object();
  505. for (int i = 0; i < N; i++)
  506. {
  507. var j = i;
  508. subscriptions[j] = new SingleAssignmentDisposable
  509. {
  510. Disposable = srcs[j].Synchronize(gate).Subscribe(
  511. x =>
  512. {
  513. values[j] = x;
  514. next(j);
  515. },
  516. observer.OnError,
  517. () =>
  518. {
  519. done(j);
  520. }
  521. )
  522. };
  523. }
  524. return new CompositeDisposable(subscriptions);
  525. });
  526. #endif
  527. }
  528. #endregion
  529. #region + Concat +
  530. public virtual IObservable<TSource> Concat<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  531. {
  532. return Concat_<TSource>(new[] { first, second });
  533. }
  534. public virtual IObservable<TSource> Concat<TSource>(params IObservable<TSource>[] sources)
  535. {
  536. return Concat_<TSource>(sources);
  537. }
  538. public virtual IObservable<TSource> Concat<TSource>(IEnumerable<IObservable<TSource>> sources)
  539. {
  540. return Concat_<TSource>(sources);
  541. }
  542. private static IObservable<TSource> Concat_<TSource>(IEnumerable<IObservable<TSource>> sources)
  543. {
  544. #if !NO_PERF
  545. return new Concat<TSource>(sources);
  546. #else
  547. return new AnonymousObservable<TSource>(observer =>
  548. {
  549. var isDisposed = false;
  550. var e = sources.GetEnumerator();
  551. var subscription = new SerialDisposable();
  552. var gate = new AsyncLock();
  553. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>
  554. {
  555. var current = default(IObservable<TSource>);
  556. var hasNext = false;
  557. var ex = default(Exception);
  558. if (!isDisposed)
  559. {
  560. try
  561. {
  562. hasNext = e.MoveNext();
  563. if (hasNext)
  564. current = e.Current;
  565. else
  566. e.Dispose();
  567. }
  568. catch (Exception exception)
  569. {
  570. ex = exception;
  571. e.Dispose();
  572. }
  573. }
  574. else
  575. return;
  576. if (ex != null)
  577. {
  578. observer.OnError(ex);
  579. return;
  580. }
  581. if (!hasNext)
  582. {
  583. observer.OnCompleted();
  584. return;
  585. }
  586. var d = new SingleAssignmentDisposable();
  587. subscription.Disposable = d;
  588. d.Disposable = current.Subscribe(observer.OnNext, observer.OnError, self);
  589. }));
  590. return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>
  591. {
  592. e.Dispose();
  593. isDisposed = true;
  594. })));
  595. });
  596. #endif
  597. }
  598. public virtual IObservable<TSource> Concat<TSource>(IObservable<IObservable<TSource>> sources)
  599. {
  600. return Concat_<TSource>(sources);
  601. }
  602. #if !NO_TPL
  603. public virtual IObservable<TSource> Concat<TSource>(IObservable<Task<TSource>> sources)
  604. {
  605. return Concat_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));
  606. }
  607. #endif
  608. private IObservable<TSource> Concat_<TSource>(IObservable<IObservable<TSource>> sources)
  609. {
  610. return Merge(sources, 1);
  611. }
  612. #endregion
  613. #region + Merge +
  614. public virtual IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> sources)
  615. {
  616. return Merge_<TSource>(sources);
  617. }
  618. #if !NO_TPL
  619. public virtual IObservable<TSource> Merge<TSource>(IObservable<Task<TSource>> sources)
  620. {
  621. #if !NO_PERF
  622. return new Merge<TSource>(sources);
  623. #else
  624. return Merge_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));
  625. #endif
  626. }
  627. #endif
  628. public virtual IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> sources, int maxConcurrent)
  629. {
  630. return Merge_<TSource>(sources, maxConcurrent);
  631. }
  632. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, int maxConcurrent)
  633. {
  634. return Merge_<TSource>(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations), maxConcurrent);
  635. }
  636. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, int maxConcurrent, IScheduler scheduler)
  637. {
  638. return Merge_<TSource>(sources.ToObservable(scheduler), maxConcurrent);
  639. }
  640. public virtual IObservable<TSource> Merge<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  641. {
  642. return Merge_<TSource>(new[] { first, second }.ToObservable(SchedulerDefaults.ConstantTimeOperations));
  643. }
  644. public virtual IObservable<TSource> Merge<TSource>(IObservable<TSource> first, IObservable<TSource> second, IScheduler scheduler)
  645. {
  646. return Merge_<TSource>(new[] { first, second }.ToObservable(scheduler));
  647. }
  648. public virtual IObservable<TSource> Merge<TSource>(params IObservable<TSource>[] sources)
  649. {
  650. return Merge_<TSource>(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations));
  651. }
  652. public virtual IObservable<TSource> Merge<TSource>(IScheduler scheduler, params IObservable<TSource>[] sources)
  653. {
  654. return Merge_<TSource>(sources.ToObservable(scheduler));
  655. }
  656. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources)
  657. {
  658. return Merge_<TSource>(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations));
  659. }
  660. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, IScheduler scheduler)
  661. {
  662. return Merge_<TSource>(sources.ToObservable(scheduler));
  663. }
  664. private static IObservable<TSource> Merge_<TSource>(IObservable<IObservable<TSource>> sources)
  665. {
  666. #if !NO_PERF
  667. return new Merge<TSource>(sources);
  668. #else
  669. return new AnonymousObservable<TSource>(observer =>
  670. {
  671. var gate = new object();
  672. var isStopped = false;
  673. var m = new SingleAssignmentDisposable();
  674. var group = new CompositeDisposable() { m };
  675. m.Disposable = sources.Subscribe(
  676. innerSource =>
  677. {
  678. var innerSubscription = new SingleAssignmentDisposable();
  679. group.Add(innerSubscription);
  680. innerSubscription.Disposable = innerSource.Subscribe(
  681. x =>
  682. {
  683. lock (gate)
  684. observer.OnNext(x);
  685. },
  686. exception =>
  687. {
  688. lock (gate)
  689. observer.OnError(exception);
  690. },
  691. () =>
  692. {
  693. group.Remove(innerSubscription); // modification MUST occur before subsequent check
  694. if (isStopped && group.Count == 1) // isStopped must be checked before group Count to ensure outer is not creating more groups
  695. lock (gate)
  696. observer.OnCompleted();
  697. });
  698. },
  699. exception =>
  700. {
  701. lock (gate)
  702. observer.OnError(exception);
  703. },
  704. () =>
  705. {
  706. isStopped = true; // modification MUST occur before subsequent check
  707. if (group.Count == 1)
  708. lock (gate)
  709. observer.OnCompleted();
  710. });
  711. return group;
  712. });
  713. #endif
  714. }
  715. private static IObservable<TSource> Merge_<TSource>(IObservable<IObservable<TSource>> sources, int maxConcurrent)
  716. {
  717. #if !NO_PERF
  718. return new Merge<TSource>(sources, maxConcurrent);
  719. #else
  720. return new AnonymousObservable<TSource>(observer =>
  721. {
  722. var gate = new object();
  723. var q = new Queue<IObservable<TSource>>();
  724. var isStopped = false;
  725. var group = new CompositeDisposable();
  726. var activeCount = 0;
  727. var subscribe = default(Action<IObservable<TSource>>);
  728. subscribe = xs =>
  729. {
  730. var subscription = new SingleAssignmentDisposable();
  731. group.Add(subscription);
  732. subscription.Disposable = xs.Subscribe(
  733. x =>
  734. {
  735. lock (gate)
  736. observer.OnNext(x);
  737. },
  738. exception =>
  739. {
  740. lock (gate)
  741. observer.OnError(exception);
  742. },
  743. () =>
  744. {
  745. group.Remove(subscription);
  746. lock (gate)
  747. {
  748. if (q.Count > 0)
  749. {
  750. var s = q.Dequeue();
  751. subscribe(s);
  752. }
  753. else
  754. {
  755. activeCount--;
  756. if (isStopped && activeCount == 0)
  757. observer.OnCompleted();
  758. }
  759. }
  760. });
  761. };
  762. group.Add(sources.Subscribe(
  763. innerSource =>
  764. {
  765. lock (gate)
  766. {
  767. if (activeCount < maxConcurrent)
  768. {
  769. activeCount++;
  770. subscribe(innerSource);
  771. }
  772. else
  773. q.Enqueue(innerSource);
  774. }
  775. },
  776. exception =>
  777. {
  778. lock (gate)
  779. observer.OnError(exception);
  780. },
  781. () =>
  782. {
  783. lock (gate)
  784. {
  785. isStopped = true;
  786. if (activeCount == 0)
  787. observer.OnCompleted();
  788. }
  789. }));
  790. return group;
  791. });
  792. #endif
  793. }
  794. #endregion
  795. #region + OnErrorResumeNext +
  796. public virtual IObservable<TSource> OnErrorResumeNext<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  797. {
  798. return OnErrorResumeNext_<TSource>(new[] { first, second });
  799. }
  800. public virtual IObservable<TSource> OnErrorResumeNext<TSource>(params IObservable<TSource>[] sources)
  801. {
  802. return OnErrorResumeNext_<TSource>(sources);
  803. }
  804. public virtual IObservable<TSource> OnErrorResumeNext<TSource>(IEnumerable<IObservable<TSource>> sources)
  805. {
  806. return OnErrorResumeNext_<TSource>(sources);
  807. }
  808. private static IObservable<TSource> OnErrorResumeNext_<TSource>(IEnumerable<IObservable<TSource>> sources)
  809. {
  810. #if !NO_PERF
  811. return new OnErrorResumeNext<TSource>(sources);
  812. #else
  813. return new AnonymousObservable<TSource>(observer =>
  814. {
  815. var gate = new AsyncLock();
  816. var isDisposed = false;
  817. var e = sources.GetEnumerator();
  818. var subscription = new SerialDisposable();
  819. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>
  820. {
  821. var current = default(IObservable<TSource>);
  822. var hasNext = false;
  823. var ex = default(Exception);
  824. if (!isDisposed)
  825. {
  826. try
  827. {
  828. hasNext = e.MoveNext();
  829. if (hasNext)
  830. current = e.Current;
  831. else
  832. e.Dispose();
  833. }
  834. catch (Exception exception)
  835. {
  836. ex = exception;
  837. e.Dispose();
  838. }
  839. }
  840. else
  841. return;
  842. if (ex != null)
  843. {
  844. observer.OnError(ex);
  845. return;
  846. }
  847. if (!hasNext)
  848. {
  849. observer.OnCompleted();
  850. return;
  851. }
  852. var d = new SingleAssignmentDisposable();
  853. subscription.Disposable = d;
  854. d.Disposable = current.Subscribe(observer.OnNext, exception => self(), self);
  855. }));
  856. return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>
  857. {
  858. e.Dispose();
  859. isDisposed = true;
  860. })));
  861. });
  862. #endif
  863. }
  864. #endregion
  865. #region + SkipUntil +
  866. public virtual IObservable<TSource> SkipUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)
  867. {
  868. #if !NO_PERF
  869. return new SkipUntil<TSource, TOther>(source, other);
  870. #else
  871. return new AnonymousObservable<TSource>(observer =>
  872. {
  873. var sourceSubscription = new SingleAssignmentDisposable();
  874. var otherSubscription = new SingleAssignmentDisposable();
  875. var open = false;
  876. var gate = new object();
  877. sourceSubscription.Disposable = source.Synchronize(gate).Subscribe(
  878. x =>
  879. {
  880. if (open)
  881. observer.OnNext(x);
  882. },
  883. observer.OnError, // BREAKING CHANGE - Error propagation was guarded by "other" source in v1.0.10621 (due to materialization).
  884. () =>
  885. {
  886. if (open)
  887. observer.OnCompleted();
  888. }
  889. );
  890. otherSubscription.Disposable = other.Synchronize(gate).Subscribe(
  891. x =>
  892. {
  893. open = true;
  894. otherSubscription.Dispose();
  895. },
  896. observer.OnError
  897. );
  898. return new CompositeDisposable(sourceSubscription, otherSubscription);
  899. });
  900. #endif
  901. }
  902. #endregion
  903. #region + Switch +
  904. public virtual IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources)
  905. {
  906. return Switch_<TSource>(sources);
  907. }
  908. #if !NO_TPL
  909. public virtual IObservable<TSource> Switch<TSource>(IObservable<Task<TSource>> sources)
  910. {
  911. return Switch_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));
  912. }
  913. #endif
  914. private IObservable<TSource> Switch_<TSource>(IObservable<IObservable<TSource>> sources)
  915. {
  916. #if !NO_PERF
  917. return new Switch<TSource>(sources);
  918. #else
  919. return new AnonymousObservable<TSource>(observer =>
  920. {
  921. var gate = new object();
  922. var innerSubscription = new SerialDisposable();
  923. var isStopped = false;
  924. var latest = 0UL;
  925. var hasLatest = false;
  926. var subscription = sources.Subscribe(
  927. innerSource =>
  928. {
  929. var id = default(ulong);
  930. lock (gate)
  931. {
  932. id = unchecked(++latest);
  933. hasLatest = true;
  934. }
  935. var d = new SingleAssignmentDisposable();
  936. innerSubscription.Disposable = d;
  937. d.Disposable = innerSource.Subscribe(
  938. x =>
  939. {
  940. lock (gate)
  941. {
  942. if (latest == id)
  943. observer.OnNext(x);
  944. }
  945. },
  946. exception =>
  947. {
  948. lock (gate)
  949. {
  950. if (latest == id)
  951. observer.OnError(exception);
  952. }
  953. },
  954. () =>
  955. {
  956. lock (gate)
  957. {
  958. if (latest == id)
  959. {
  960. hasLatest = false;
  961. if (isStopped)
  962. observer.OnCompleted();
  963. }
  964. }
  965. });
  966. },
  967. exception =>
  968. {
  969. lock (gate)
  970. observer.OnError(exception);
  971. },
  972. () =>
  973. {
  974. lock (gate)
  975. {
  976. isStopped = true;
  977. if (!hasLatest)
  978. observer.OnCompleted();
  979. }
  980. });
  981. return new CompositeDisposable(subscription, innerSubscription);
  982. });
  983. #endif
  984. }
  985. #endregion
  986. #region + TakeUntil +
  987. public virtual IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)
  988. {
  989. #if !NO_PERF
  990. return new TakeUntil<TSource, TOther>(source, other);
  991. #else
  992. return new AnonymousObservable<TSource>(observer =>
  993. {
  994. var sourceSubscription = new SingleAssignmentDisposable();
  995. var otherSubscription = new SingleAssignmentDisposable();
  996. var gate = new object();
  997. // COMPAT - Order of Subscribe calls per v1.0.10621
  998. otherSubscription.Disposable = other.Synchronize(gate).Subscribe(
  999. x =>
  1000. {
  1001. observer.OnCompleted();
  1002. },
  1003. observer.OnError
  1004. );
  1005. sourceSubscription.Disposable = source.Synchronize(gate).Finally(otherSubscription.Dispose).Subscribe(observer);
  1006. return new CompositeDisposable(sourceSubscription, otherSubscription);
  1007. });
  1008. #endif
  1009. }
  1010. #endregion
  1011. #region + Window +
  1012. public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector)
  1013. {
  1014. #if !NO_PERF
  1015. return new Window<TSource, TWindowClosing>(source, windowClosingSelector);
  1016. #else
  1017. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1018. {
  1019. var window = new Subject<TSource>();
  1020. var gate = new object();
  1021. var m = new SerialDisposable();
  1022. var d = new CompositeDisposable(2) { m };
  1023. var r = new RefCountDisposable(d);
  1024. observer.OnNext(window.AddRef(r));
  1025. d.Add(source.SubscribeSafe(new AnonymousObserver<TSource>(
  1026. x =>
  1027. {
  1028. lock (gate)
  1029. {
  1030. window.OnNext(x);
  1031. }
  1032. },
  1033. ex =>
  1034. {
  1035. lock (gate)
  1036. {
  1037. window.OnError(ex);
  1038. observer.OnError(ex);
  1039. }
  1040. },
  1041. () =>
  1042. {
  1043. lock (gate)
  1044. {
  1045. window.OnCompleted();
  1046. observer.OnCompleted();
  1047. }
  1048. })));
  1049. var l = new AsyncLock();
  1050. Action createWindowClose = null;
  1051. createWindowClose = () =>
  1052. {
  1053. var windowClose = default(IObservable<TWindowClosing>);
  1054. try
  1055. {
  1056. windowClose = windowClosingSelector();
  1057. }
  1058. catch (Exception exception)
  1059. {
  1060. lock (gate)
  1061. {
  1062. observer.OnError(exception);
  1063. }
  1064. return;
  1065. }
  1066. var m1 = new SingleAssignmentDisposable();
  1067. m.Disposable = m1;
  1068. m1.Disposable = windowClose.Take(1).SubscribeSafe(new AnonymousObserver<TWindowClosing>(
  1069. Stubs<TWindowClosing>.Ignore,
  1070. ex =>
  1071. {
  1072. lock (gate)
  1073. {
  1074. window.OnError(ex);
  1075. observer.OnError(ex);
  1076. }
  1077. },
  1078. () =>
  1079. {
  1080. lock (gate)
  1081. {
  1082. window.OnCompleted();
  1083. window = new Subject<TSource>();
  1084. observer.OnNext(window.AddRef(r));
  1085. }
  1086. l.Wait(createWindowClose);
  1087. }));
  1088. };
  1089. l.Wait(createWindowClose);
  1090. return r;
  1091. });
  1092. #endif
  1093. }
  1094. public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector)
  1095. {
  1096. return windowOpenings.GroupJoin(source, windowClosingSelector, _ => Observable.Empty<Unit>(), (_, window) => window);
  1097. }
  1098. public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries)
  1099. {
  1100. #if !NO_PERF
  1101. return new Window<TSource, TWindowBoundary>(source, windowBoundaries);
  1102. #else
  1103. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1104. {
  1105. var window = new Subject<TSource>();
  1106. var gate = new object();
  1107. var d = new CompositeDisposable(2);
  1108. var r = new RefCountDisposable(d);
  1109. observer.OnNext(window.AddRef(r));
  1110. d.Add(source.SubscribeSafe(new AnonymousObserver<TSource>(
  1111. x =>
  1112. {
  1113. lock (gate)
  1114. {
  1115. window.OnNext(x);
  1116. }
  1117. },
  1118. ex =>
  1119. {
  1120. lock (gate)
  1121. {
  1122. window.OnError(ex);
  1123. observer.OnError(ex);
  1124. }
  1125. },
  1126. () =>
  1127. {
  1128. lock (gate)
  1129. {
  1130. window.OnCompleted();
  1131. observer.OnCompleted();
  1132. }
  1133. }
  1134. )));
  1135. d.Add(windowBoundaries.SubscribeSafe(new AnonymousObserver<TWindowBoundary>(
  1136. w =>
  1137. {
  1138. lock (gate)
  1139. {
  1140. window.OnCompleted();
  1141. window = new Subject<TSource>();
  1142. observer.OnNext(window.AddRef(r));
  1143. }
  1144. },
  1145. ex =>
  1146. {
  1147. lock (gate)
  1148. {
  1149. window.OnError(ex);
  1150. observer.OnError(ex);
  1151. }
  1152. },
  1153. () =>
  1154. {
  1155. lock (gate)
  1156. {
  1157. window.OnCompleted();
  1158. observer.OnCompleted();
  1159. }
  1160. }
  1161. )));
  1162. return r;
  1163. });
  1164. #endif
  1165. }
  1166. #endregion
  1167. #region + WithLatestFrom +
  1168. public virtual IObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  1169. {
  1170. return new WithLatestFrom<TFirst, TSecond, TResult>(first, second, resultSelector);
  1171. }
  1172. #endregion
  1173. #region + Zip +
  1174. public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  1175. {
  1176. #if !NO_PERF
  1177. return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);
  1178. #else
  1179. return new AnonymousObservable<TResult>(observer =>
  1180. {
  1181. var queueLeft = new Queue<TFirst>();
  1182. var queueRight = new Queue<TSecond>();
  1183. var leftDone = false;
  1184. var rightDone = false;
  1185. var leftSubscription = new SingleAssignmentDisposable();
  1186. var rightSubscription = new SingleAssignmentDisposable();
  1187. var gate = new object();
  1188. leftSubscription.Disposable = first.Synchronize(gate).Subscribe(
  1189. l =>
  1190. {
  1191. if (queueRight.Count > 0)
  1192. {
  1193. var r = queueRight.Dequeue();
  1194. var res = default(TResult);
  1195. try
  1196. {
  1197. res = resultSelector(l, r);
  1198. }
  1199. catch (Exception ex)
  1200. {
  1201. observer.OnError(ex);
  1202. return;
  1203. }
  1204. observer.OnNext(res);
  1205. }
  1206. else
  1207. {
  1208. if (rightDone)
  1209. {
  1210. observer.OnCompleted();
  1211. return;
  1212. }
  1213. queueLeft.Enqueue(l);
  1214. }
  1215. },
  1216. observer.OnError,
  1217. () =>
  1218. {
  1219. leftDone = true;
  1220. if (rightDone)
  1221. {
  1222. observer.OnCompleted();
  1223. return;
  1224. }
  1225. }
  1226. );
  1227. rightSubscription.Disposable = second.Synchronize(gate).Subscribe(
  1228. r =>
  1229. {
  1230. if (queueLeft.Count > 0)
  1231. {
  1232. var l = queueLeft.Dequeue();
  1233. var res = default(TResult);
  1234. try
  1235. {
  1236. res = resultSelector(l, r);
  1237. }
  1238. catch (Exception ex)
  1239. {
  1240. observer.OnError(ex);
  1241. return;
  1242. }
  1243. observer.OnNext(res);
  1244. }
  1245. else
  1246. {
  1247. if (leftDone)
  1248. {
  1249. observer.OnCompleted();
  1250. return;
  1251. }
  1252. queueRight.Enqueue(r);
  1253. }
  1254. },
  1255. observer.OnError,
  1256. () =>
  1257. {
  1258. rightDone = true;
  1259. if (leftDone)
  1260. {
  1261. observer.OnCompleted();
  1262. return;
  1263. }
  1264. }
  1265. );
  1266. return new CompositeDisposable(leftSubscription, rightSubscription, Disposable.Create(() => { queueLeft.Clear(); queueRight.Clear(); }));
  1267. });
  1268. #endif
  1269. }
  1270. public virtual IObservable<TResult> Zip<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  1271. {
  1272. return Zip_<TSource>(sources).Select(resultSelector);
  1273. }
  1274. public virtual IObservable<IList<TSource>> Zip<TSource>(IEnumerable<IObservable<TSource>> sources)
  1275. {
  1276. return Zip_<TSource>(sources);
  1277. }
  1278. public virtual IObservable<IList<TSource>> Zip<TSource>(params IObservable<TSource>[] sources)
  1279. {
  1280. return Zip_<TSource>(sources);
  1281. }
  1282. private static IObservable<IList<TSource>> Zip_<TSource>(IEnumerable<IObservable<TSource>> sources)
  1283. {
  1284. #if !NO_PERF
  1285. return new Zip<TSource>(sources);
  1286. #else
  1287. return new AnonymousObservable<IList<TSource>>(observer =>
  1288. {
  1289. var srcs = sources.ToArray();
  1290. var N = srcs.Length;
  1291. var queues = new Queue<TSource>[N];
  1292. for (int i = 0; i < N; i++)
  1293. queues[i] = new Queue<TSource>();
  1294. var isDone = new bool[N];
  1295. var next = new Action<int>(i =>
  1296. {
  1297. if (queues.All(q => q.Count > 0))
  1298. {
  1299. var res = queues.Select(q => q.Dequeue()).ToList();
  1300. observer.OnNext(res);
  1301. }
  1302. else if (isDone.Where((x, j) => j != i).All(Stubs<bool>.I))
  1303. {
  1304. observer.OnCompleted();
  1305. return;
  1306. }
  1307. });
  1308. var done = new Action<int>(i =>
  1309. {
  1310. isDone[i] = true;
  1311. if (isDone.All(Stubs<bool>.I))
  1312. {
  1313. observer.OnCompleted();
  1314. return;
  1315. }
  1316. });
  1317. var subscriptions = new SingleAssignmentDisposable[N];
  1318. var gate = new object();
  1319. for (int i = 0; i < N; i++)
  1320. {
  1321. var j = i;
  1322. subscriptions[j] = new SingleAssignmentDisposable
  1323. {
  1324. Disposable = srcs[j].Synchronize(gate).Subscribe(
  1325. x =>
  1326. {
  1327. queues[j].Enqueue(x);
  1328. next(j);
  1329. },
  1330. observer.OnError,
  1331. () =>
  1332. {
  1333. done(j);
  1334. }
  1335. )
  1336. };
  1337. }
  1338. return new CompositeDisposable(subscriptions) { Disposable.Create(() => { foreach (var q in queues) q.Clear(); }) };
  1339. });
  1340. #endif
  1341. }
  1342. #if !NO_PERF
  1343. /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */
  1344. #region Zip auto-generated code (6/10/2012 8:15:28 PM)
  1345. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, Func<TSource1, TSource2, TSource3, TResult> resultSelector)
  1346. {
  1347. return new Zip<TSource1, TSource2, TSource3, TResult>(source1, source2, source3, resultSelector);
  1348. }
  1349. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, Func<TSource1, TSource2, TSource3, TSource4, TResult> resultSelector)
  1350. {
  1351. return new Zip<TSource1, TSource2, TSource3, TSource4, TResult>(source1, source2, source3, source4, resultSelector);
  1352. }
  1353. #if !NO_LARGEARITY
  1354. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TResult> resultSelector)
  1355. {
  1356. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(source1, source2, source3, source4, source5, resultSelector);
  1357. }
  1358. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult> resultSelector)
  1359. {
  1360. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(source1, source2, source3, source4, source5, source6, resultSelector);
  1361. }
  1362. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult> resultSelector)
  1363. {
  1364. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(source1, source2, source3, source4, source5, source6, source7, resultSelector);
  1365. }
  1366. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> resultSelector)
  1367. {
  1368. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector);
  1369. }
  1370. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult> resultSelector)
  1371. {
  1372. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector);
  1373. }
  1374. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult> resultSelector)
  1375. {
  1376. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector);
  1377. }
  1378. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult> resultSelector)
  1379. {
  1380. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector);
  1381. }
  1382. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult> resultSelector)
  1383. {
  1384. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector);
  1385. }
  1386. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult> resultSelector)
  1387. {
  1388. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector);
  1389. }
  1390. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult> resultSelector)
  1391. {
  1392. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector);
  1393. }
  1394. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult> resultSelector)
  1395. {
  1396. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector);
  1397. }
  1398. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, IObservable<TSource16> source16, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult> resultSelector)
  1399. {
  1400. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector);
  1401. }
  1402. #endif
  1403. #endregion
  1404. #endif
  1405. public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  1406. {
  1407. #if !NO_PERF
  1408. return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);
  1409. #else
  1410. return new AnonymousObservable<TResult>(observer =>
  1411. {
  1412. var rightEnumerator = second.GetEnumerator();
  1413. var leftSubscription = first.Subscribe(left =>
  1414. {
  1415. var hasNext = false;
  1416. try
  1417. {
  1418. hasNext = rightEnumerator.MoveNext();
  1419. }
  1420. catch (Exception ex)
  1421. {
  1422. observer.OnError(ex);
  1423. return;
  1424. }
  1425. if (hasNext)
  1426. {
  1427. var right = default(TSecond);
  1428. try
  1429. {
  1430. right = rightEnumerator.Current;
  1431. }
  1432. catch (Exception ex)
  1433. {
  1434. observer.OnError(ex);
  1435. return;
  1436. }
  1437. TResult result;
  1438. try
  1439. {
  1440. result = resultSelector(left, right);
  1441. }
  1442. catch (Exception ex)
  1443. {
  1444. observer.OnError(ex);
  1445. return;
  1446. }
  1447. observer.OnNext(result);
  1448. }
  1449. else
  1450. {
  1451. observer.OnCompleted();
  1452. }
  1453. },
  1454. observer.OnError,
  1455. observer.OnCompleted
  1456. );
  1457. return new CompositeDisposable(leftSubscription, rightEnumerator);
  1458. });
  1459. #endif
  1460. }
  1461. #endregion
  1462. #region |> Helpers <|
  1463. #if NO_PERF
  1464. private static IObservable<TResult> Combine<TLeft, TRight, TResult>(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)
  1465. {
  1466. return new AnonymousObservable<TResult>(observer =>
  1467. {
  1468. var leftSubscription = new SingleAssignmentDisposable();
  1469. var rightSubscription = new SingleAssignmentDisposable();
  1470. var combiner = combinerSelector(observer, leftSubscription, rightSubscription);
  1471. var gate = new object();
  1472. leftSubscription.Disposable = leftSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner);
  1473. rightSubscription.Disposable = rightSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x)).Synchronize(gate).Subscribe(combiner);
  1474. return new CompositeDisposable(leftSubscription, rightSubscription);
  1475. });
  1476. }
  1477. #endif
  1478. #endregion
  1479. }
  1480. }