QueryLanguage.Multiple.cs 78 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Collections.ObjectModel;
  7. using System.Linq;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Disposables;
  10. using System.Reactive.Subjects;
  11. using System.Reactive.Threading.Tasks;
  12. using System.Threading.Tasks;
  13. namespace System.Reactive.Linq
  14. {
  15. #if !NO_PERF
  16. using ObservableImpl;
  17. #endif
  18. internal partial class QueryLanguage
  19. {
  20. #region + Amb +
  21. public virtual IObservable<TSource> Amb<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  22. {
  23. return Amb_(first, second);
  24. }
  25. public virtual IObservable<TSource> Amb<TSource>(params IObservable<TSource>[] sources)
  26. {
  27. return Amb_(sources);
  28. }
  29. public virtual IObservable<TSource> Amb<TSource>(IEnumerable<IObservable<TSource>> sources)
  30. {
  31. return Amb_(sources);
  32. }
  33. private static IObservable<TSource> Amb_<TSource>(IEnumerable<IObservable<TSource>> sources)
  34. {
  35. return sources.Aggregate(Observable.Never<TSource>(), (previous, current) => previous.Amb(current));
  36. }
  37. private static IObservable<TSource> Amb_<TSource>(IObservable<TSource> leftSource, IObservable<TSource> rightSource)
  38. {
  39. #if !NO_PERF
  40. return new Amb<TSource>(leftSource, rightSource);
  41. #else
  42. return new AnonymousObservable<TSource>(observer =>
  43. {
  44. var leftSubscription = new SingleAssignmentDisposable();
  45. var rightSubscription = new SingleAssignmentDisposable();
  46. var choice = AmbState.Neither;
  47. var gate = new object();
  48. var left = new AmbObserver<TSource>();
  49. var right = new AmbObserver<TSource>();
  50. left.Observer = Observer.Synchronize(Observer.Create<TSource>(
  51. x =>
  52. {
  53. if (choice == AmbState.Neither)
  54. {
  55. choice = AmbState.Left;
  56. rightSubscription.Dispose();
  57. left.Observer = observer;
  58. }
  59. if (choice == AmbState.Left)
  60. observer.OnNext(x);
  61. },
  62. ex =>
  63. {
  64. if (choice == AmbState.Neither)
  65. {
  66. choice = AmbState.Left;
  67. rightSubscription.Dispose();
  68. left.Observer = observer;
  69. }
  70. if (choice == AmbState.Left)
  71. observer.OnError(ex);
  72. },
  73. () =>
  74. {
  75. if (choice == AmbState.Neither)
  76. {
  77. choice = AmbState.Left;
  78. rightSubscription.Dispose();
  79. left.Observer = observer;
  80. }
  81. if (choice == AmbState.Left)
  82. observer.OnCompleted();
  83. }
  84. ), gate);
  85. right.Observer = Observer.Synchronize(Observer.Create<TSource>(
  86. x =>
  87. {
  88. if (choice == AmbState.Neither)
  89. {
  90. choice = AmbState.Right;
  91. leftSubscription.Dispose();
  92. right.Observer = observer;
  93. }
  94. if (choice == AmbState.Right)
  95. observer.OnNext(x);
  96. },
  97. ex =>
  98. {
  99. if (choice == AmbState.Neither)
  100. {
  101. choice = AmbState.Right;
  102. leftSubscription.Dispose();
  103. right.Observer = observer;
  104. }
  105. if (choice == AmbState.Right)
  106. observer.OnError(ex);
  107. },
  108. () =>
  109. {
  110. if (choice == AmbState.Neither)
  111. {
  112. choice = AmbState.Right;
  113. leftSubscription.Dispose();
  114. right.Observer = observer;
  115. }
  116. if (choice == AmbState.Right)
  117. observer.OnCompleted();
  118. }
  119. ), gate);
  120. leftSubscription.Disposable = leftSource.Subscribe(left);
  121. rightSubscription.Disposable = rightSource.Subscribe(right);
  122. return new CompositeDisposable(leftSubscription, rightSubscription);
  123. });
  124. #endif
  125. }
  126. #if NO_PERF
  127. class AmbObserver<TSource> : IObserver<TSource>
  128. {
  129. public virtual IObserver<TSource> Observer { get; set; }
  130. public virtual void OnCompleted()
  131. {
  132. Observer.OnCompleted();
  133. }
  134. public virtual void OnError(Exception error)
  135. {
  136. Observer.OnError(error);
  137. }
  138. public virtual void OnNext(TSource value)
  139. {
  140. Observer.OnNext(value);
  141. }
  142. }
  143. enum AmbState
  144. {
  145. Left,
  146. Right,
  147. Neither
  148. }
  149. #endif
  150. #endregion
  151. #region + Buffer +
  152. public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferClosing>(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)
  153. {
  154. #if !NO_PERF
  155. return new Buffer<TSource, TBufferClosing>(source, bufferClosingSelector);
  156. #else
  157. return source.Window(bufferClosingSelector).SelectMany(ToList);
  158. #endif
  159. }
  160. public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferOpening, TBufferClosing>(IObservable<TSource> source, IObservable<TBufferOpening> bufferOpenings, Func<TBufferOpening, IObservable<TBufferClosing>> bufferClosingSelector)
  161. {
  162. return source.Window(bufferOpenings, bufferClosingSelector).SelectMany(ToList);
  163. }
  164. public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferBoundary>(IObservable<TSource> source, IObservable<TBufferBoundary> bufferBoundaries)
  165. {
  166. #if !NO_PERF
  167. return new Buffer<TSource, TBufferBoundary>(source, bufferBoundaries);
  168. #else
  169. return source.Window(bufferBoundaries).SelectMany(ToList);
  170. #endif
  171. }
  172. #endregion
  173. #region + Catch +
  174. public virtual IObservable<TSource> Catch<TSource, TException>(IObservable<TSource> source, Func<TException, IObservable<TSource>> handler) where TException : Exception
  175. {
  176. #if !NO_PERF
  177. return new Catch<TSource, TException>(source, handler);
  178. #else
  179. return new AnonymousObservable<TSource>(observer =>
  180. {
  181. var subscription = new SerialDisposable();
  182. var d1 = new SingleAssignmentDisposable();
  183. subscription.Disposable = d1;
  184. d1.Disposable = source.Subscribe(observer.OnNext,
  185. exception =>
  186. {
  187. var e = exception as TException;
  188. if (e != null)
  189. {
  190. IObservable<TSource> result;
  191. try
  192. {
  193. result = handler(e);
  194. }
  195. catch (Exception ex)
  196. {
  197. observer.OnError(ex);
  198. return;
  199. }
  200. var d = new SingleAssignmentDisposable();
  201. subscription.Disposable = d;
  202. d.Disposable = result.Subscribe(observer);
  203. }
  204. else
  205. observer.OnError(exception);
  206. }, observer.OnCompleted);
  207. return subscription;
  208. });
  209. #endif
  210. }
  211. public virtual IObservable<TSource> Catch<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  212. {
  213. return Catch_<TSource>(new[] { first, second });
  214. }
  215. public virtual IObservable<TSource> Catch<TSource>(params IObservable<TSource>[] sources)
  216. {
  217. return Catch_<TSource>(sources);
  218. }
  219. public virtual IObservable<TSource> Catch<TSource>(IEnumerable<IObservable<TSource>> sources)
  220. {
  221. return Catch_<TSource>(sources);
  222. }
  223. private static IObservable<TSource> Catch_<TSource>(IEnumerable<IObservable<TSource>> sources)
  224. {
  225. #if !NO_PERF
  226. return new Catch<TSource>(sources);
  227. #else
  228. return new AnonymousObservable<TSource>(observer =>
  229. {
  230. var gate = new AsyncLock();
  231. var isDisposed = false;
  232. var e = sources.GetEnumerator();
  233. var subscription = new SerialDisposable();
  234. var lastException = default(Exception);
  235. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>
  236. {
  237. var current = default(IObservable<TSource>);
  238. var hasNext = false;
  239. var ex = default(Exception);
  240. if (!isDisposed)
  241. {
  242. try
  243. {
  244. hasNext = e.MoveNext();
  245. if (hasNext)
  246. current = e.Current;
  247. else
  248. e.Dispose();
  249. }
  250. catch (Exception exception)
  251. {
  252. ex = exception;
  253. e.Dispose();
  254. }
  255. }
  256. else
  257. return;
  258. if (ex != null)
  259. {
  260. observer.OnError(ex);
  261. return;
  262. }
  263. if (!hasNext)
  264. {
  265. if (lastException != null)
  266. observer.OnError(lastException);
  267. else
  268. observer.OnCompleted();
  269. return;
  270. }
  271. var d = new SingleAssignmentDisposable();
  272. subscription.Disposable = d;
  273. d.Disposable = current.Subscribe(observer.OnNext, exception =>
  274. {
  275. lastException = exception;
  276. self();
  277. }, observer.OnCompleted);
  278. }));
  279. return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>
  280. {
  281. e.Dispose();
  282. isDisposed = true;
  283. })));
  284. });
  285. #endif
  286. }
  287. #endregion
  288. #region + CombineLatest +
  289. public virtual IObservable<TResult> CombineLatest<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  290. {
  291. #if !NO_PERF
  292. return new CombineLatest<TFirst, TSecond, TResult>(first, second, resultSelector);
  293. #else
  294. return new AnonymousObservable<TResult>(observer =>
  295. {
  296. var hasLeft = false;
  297. var hasRight = false;
  298. var left = default(TFirst);
  299. var right = default(TSecond);
  300. var leftDone = false;
  301. var rightDone = false;
  302. var leftSubscription = new SingleAssignmentDisposable();
  303. var rightSubscription = new SingleAssignmentDisposable();
  304. var gate = new object();
  305. leftSubscription.Disposable = first.Synchronize(gate).Subscribe(
  306. l =>
  307. {
  308. hasLeft = true;
  309. left = l;
  310. if (hasRight)
  311. {
  312. var res = default(TResult);
  313. try
  314. {
  315. res = resultSelector(left, right);
  316. }
  317. catch (Exception ex)
  318. {
  319. observer.OnError(ex);
  320. return;
  321. }
  322. observer.OnNext(res);
  323. }
  324. else if (rightDone)
  325. {
  326. observer.OnCompleted();
  327. return;
  328. }
  329. },
  330. observer.OnError,
  331. () =>
  332. {
  333. leftDone = true;
  334. if (rightDone)
  335. {
  336. observer.OnCompleted();
  337. return;
  338. }
  339. }
  340. );
  341. rightSubscription.Disposable = second.Synchronize(gate).Subscribe(
  342. r =>
  343. {
  344. hasRight = true;
  345. right = r;
  346. if (hasLeft)
  347. {
  348. var res = default(TResult);
  349. try
  350. {
  351. res = resultSelector(left, right);
  352. }
  353. catch (Exception ex)
  354. {
  355. observer.OnError(ex);
  356. return;
  357. }
  358. observer.OnNext(res);
  359. }
  360. else if (leftDone)
  361. {
  362. observer.OnCompleted();
  363. return;
  364. }
  365. },
  366. observer.OnError,
  367. () =>
  368. {
  369. rightDone = true;
  370. if (leftDone)
  371. {
  372. observer.OnCompleted();
  373. return;
  374. }
  375. }
  376. );
  377. return new CompositeDisposable(leftSubscription, rightSubscription);
  378. });
  379. #endif
  380. }
  381. #if !NO_PERF
  382. /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */
  383. #region CombineLatest auto-generated code (6/10/2012 7:25:03 PM)
  384. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, Func<TSource1, TSource2, TSource3, TResult> resultSelector)
  385. {
  386. return new CombineLatest<TSource1, TSource2, TSource3, TResult>(source1, source2, source3, resultSelector);
  387. }
  388. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, Func<TSource1, TSource2, TSource3, TSource4, TResult> resultSelector)
  389. {
  390. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TResult>(source1, source2, source3, source4, resultSelector);
  391. }
  392. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TResult> resultSelector)
  393. {
  394. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(source1, source2, source3, source4, source5, resultSelector);
  395. }
  396. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult> resultSelector)
  397. {
  398. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(source1, source2, source3, source4, source5, source6, resultSelector);
  399. }
  400. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult> resultSelector)
  401. {
  402. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(source1, source2, source3, source4, source5, source6, source7, resultSelector);
  403. }
  404. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> resultSelector)
  405. {
  406. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector);
  407. }
  408. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult> resultSelector)
  409. {
  410. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector);
  411. }
  412. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult> resultSelector)
  413. {
  414. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector);
  415. }
  416. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult> resultSelector)
  417. {
  418. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector);
  419. }
  420. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult> resultSelector)
  421. {
  422. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector);
  423. }
  424. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult> resultSelector)
  425. {
  426. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector);
  427. }
  428. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult> resultSelector)
  429. {
  430. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector);
  431. }
  432. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult> resultSelector)
  433. {
  434. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector);
  435. }
  436. public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, IObservable<TSource16> source16, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult> resultSelector)
  437. {
  438. return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector);
  439. }
  440. #endregion
  441. #endif
  442. public virtual IObservable<TResult> CombineLatest<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  443. {
  444. return CombineLatest_<TSource, TResult>(sources, resultSelector);
  445. }
  446. public virtual IObservable<IList<TSource>> CombineLatest<TSource>(IEnumerable<IObservable<TSource>> sources)
  447. {
  448. return CombineLatest_<TSource, IList<TSource>>(sources, res => res.ToList());
  449. }
  450. public virtual IObservable<IList<TSource>> CombineLatest<TSource>(params IObservable<TSource>[] sources)
  451. {
  452. return CombineLatest_<TSource, IList<TSource>>(sources, res => res.ToList());
  453. }
  454. private static IObservable<TResult> CombineLatest_<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  455. {
  456. #if !NO_PERF
  457. return new CombineLatest<TSource, TResult>(sources, resultSelector);
  458. #else
  459. return new AnonymousObservable<TResult>(observer =>
  460. {
  461. var srcs = sources.ToArray();
  462. var N = srcs.Length;
  463. var hasValue = new bool[N];
  464. var hasValueAll = false;
  465. var values = new List<TSource>(N);
  466. for (int i = 0; i < N; i++)
  467. values.Add(default(TSource));
  468. var isDone = new bool[N];
  469. var next = new Action<int>(i =>
  470. {
  471. hasValue[i] = true;
  472. if (hasValueAll || (hasValueAll = hasValue.All(Stubs<bool>.I)))
  473. {
  474. var res = default(TResult);
  475. try
  476. {
  477. res = resultSelector(new ReadOnlyCollection<TSource>(values));
  478. }
  479. catch (Exception ex)
  480. {
  481. observer.OnError(ex);
  482. return;
  483. }
  484. observer.OnNext(res);
  485. }
  486. else if (isDone.Where((x, j) => j != i).All(Stubs<bool>.I))
  487. {
  488. observer.OnCompleted();
  489. return;
  490. }
  491. });
  492. var done = new Action<int>(i =>
  493. {
  494. isDone[i] = true;
  495. if (isDone.All(Stubs<bool>.I))
  496. {
  497. observer.OnCompleted();
  498. return;
  499. }
  500. });
  501. var subscriptions = new SingleAssignmentDisposable[N];
  502. var gate = new object();
  503. for (int i = 0; i < N; i++)
  504. {
  505. var j = i;
  506. subscriptions[j] = new SingleAssignmentDisposable
  507. {
  508. Disposable = srcs[j].Synchronize(gate).Subscribe(
  509. x =>
  510. {
  511. values[j] = x;
  512. next(j);
  513. },
  514. observer.OnError,
  515. () =>
  516. {
  517. done(j);
  518. }
  519. )
  520. };
  521. }
  522. return new CompositeDisposable(subscriptions);
  523. });
  524. #endif
  525. }
  526. #endregion
  527. #region + Concat +
  528. public virtual IObservable<TSource> Concat<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  529. {
  530. return Concat_<TSource>(new[] { first, second });
  531. }
  532. public virtual IObservable<TSource> Concat<TSource>(params IObservable<TSource>[] sources)
  533. {
  534. return Concat_<TSource>(sources);
  535. }
  536. public virtual IObservable<TSource> Concat<TSource>(IEnumerable<IObservable<TSource>> sources)
  537. {
  538. return Concat_<TSource>(sources);
  539. }
  540. private static IObservable<TSource> Concat_<TSource>(IEnumerable<IObservable<TSource>> sources)
  541. {
  542. #if !NO_PERF
  543. return new Concat<TSource>(sources);
  544. #else
  545. return new AnonymousObservable<TSource>(observer =>
  546. {
  547. var isDisposed = false;
  548. var e = sources.GetEnumerator();
  549. var subscription = new SerialDisposable();
  550. var gate = new AsyncLock();
  551. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>
  552. {
  553. var current = default(IObservable<TSource>);
  554. var hasNext = false;
  555. var ex = default(Exception);
  556. if (!isDisposed)
  557. {
  558. try
  559. {
  560. hasNext = e.MoveNext();
  561. if (hasNext)
  562. current = e.Current;
  563. else
  564. e.Dispose();
  565. }
  566. catch (Exception exception)
  567. {
  568. ex = exception;
  569. e.Dispose();
  570. }
  571. }
  572. else
  573. return;
  574. if (ex != null)
  575. {
  576. observer.OnError(ex);
  577. return;
  578. }
  579. if (!hasNext)
  580. {
  581. observer.OnCompleted();
  582. return;
  583. }
  584. var d = new SingleAssignmentDisposable();
  585. subscription.Disposable = d;
  586. d.Disposable = current.Subscribe(observer.OnNext, observer.OnError, self);
  587. }));
  588. return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>
  589. {
  590. e.Dispose();
  591. isDisposed = true;
  592. })));
  593. });
  594. #endif
  595. }
  596. public virtual IObservable<TSource> Concat<TSource>(IObservable<IObservable<TSource>> sources)
  597. {
  598. return Concat_<TSource>(sources);
  599. }
  600. public virtual IObservable<TSource> Concat<TSource>(IObservable<Task<TSource>> sources)
  601. {
  602. return Concat_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));
  603. }
  604. private IObservable<TSource> Concat_<TSource>(IObservable<IObservable<TSource>> sources)
  605. {
  606. return Merge(sources, 1);
  607. }
  608. #endregion
  609. #region + Merge +
  610. public virtual IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> sources)
  611. {
  612. return Merge_<TSource>(sources);
  613. }
  614. public virtual IObservable<TSource> Merge<TSource>(IObservable<Task<TSource>> sources)
  615. {
  616. #if !NO_PERF
  617. return new Merge<TSource>(sources);
  618. #else
  619. return Merge_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));
  620. #endif
  621. }
  622. public virtual IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> sources, int maxConcurrent)
  623. {
  624. return Merge_<TSource>(sources, maxConcurrent);
  625. }
  626. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, int maxConcurrent)
  627. {
  628. return Merge_<TSource>(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations), maxConcurrent);
  629. }
  630. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, int maxConcurrent, IScheduler scheduler)
  631. {
  632. return Merge_<TSource>(sources.ToObservable(scheduler), maxConcurrent);
  633. }
  634. public virtual IObservable<TSource> Merge<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  635. {
  636. return Merge_<TSource>(new[] { first, second }.ToObservable(SchedulerDefaults.ConstantTimeOperations));
  637. }
  638. public virtual IObservable<TSource> Merge<TSource>(IObservable<TSource> first, IObservable<TSource> second, IScheduler scheduler)
  639. {
  640. return Merge_<TSource>(new[] { first, second }.ToObservable(scheduler));
  641. }
  642. public virtual IObservable<TSource> Merge<TSource>(params IObservable<TSource>[] sources)
  643. {
  644. return Merge_<TSource>(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations));
  645. }
  646. public virtual IObservable<TSource> Merge<TSource>(IScheduler scheduler, params IObservable<TSource>[] sources)
  647. {
  648. return Merge_<TSource>(sources.ToObservable(scheduler));
  649. }
  650. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources)
  651. {
  652. return Merge_<TSource>(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations));
  653. }
  654. public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, IScheduler scheduler)
  655. {
  656. return Merge_<TSource>(sources.ToObservable(scheduler));
  657. }
  658. private static IObservable<TSource> Merge_<TSource>(IObservable<IObservable<TSource>> sources)
  659. {
  660. #if !NO_PERF
  661. return new Merge<TSource>(sources);
  662. #else
  663. return new AnonymousObservable<TSource>(observer =>
  664. {
  665. var gate = new object();
  666. var isStopped = false;
  667. var m = new SingleAssignmentDisposable();
  668. var group = new CompositeDisposable() { m };
  669. m.Disposable = sources.Subscribe(
  670. innerSource =>
  671. {
  672. var innerSubscription = new SingleAssignmentDisposable();
  673. group.Add(innerSubscription);
  674. innerSubscription.Disposable = innerSource.Subscribe(
  675. x =>
  676. {
  677. lock (gate)
  678. observer.OnNext(x);
  679. },
  680. exception =>
  681. {
  682. lock (gate)
  683. observer.OnError(exception);
  684. },
  685. () =>
  686. {
  687. group.Remove(innerSubscription); // modification MUST occur before subsequent check
  688. if (isStopped && group.Count == 1) // isStopped must be checked before group Count to ensure outer is not creating more groups
  689. lock (gate)
  690. observer.OnCompleted();
  691. });
  692. },
  693. exception =>
  694. {
  695. lock (gate)
  696. observer.OnError(exception);
  697. },
  698. () =>
  699. {
  700. isStopped = true; // modification MUST occur before subsequent check
  701. if (group.Count == 1)
  702. lock (gate)
  703. observer.OnCompleted();
  704. });
  705. return group;
  706. });
  707. #endif
  708. }
  709. private static IObservable<TSource> Merge_<TSource>(IObservable<IObservable<TSource>> sources, int maxConcurrent)
  710. {
  711. #if !NO_PERF
  712. return new Merge<TSource>(sources, maxConcurrent);
  713. #else
  714. return new AnonymousObservable<TSource>(observer =>
  715. {
  716. var gate = new object();
  717. var q = new Queue<IObservable<TSource>>();
  718. var isStopped = false;
  719. var group = new CompositeDisposable();
  720. var activeCount = 0;
  721. var subscribe = default(Action<IObservable<TSource>>);
  722. subscribe = xs =>
  723. {
  724. var subscription = new SingleAssignmentDisposable();
  725. group.Add(subscription);
  726. subscription.Disposable = xs.Subscribe(
  727. x =>
  728. {
  729. lock (gate)
  730. observer.OnNext(x);
  731. },
  732. exception =>
  733. {
  734. lock (gate)
  735. observer.OnError(exception);
  736. },
  737. () =>
  738. {
  739. group.Remove(subscription);
  740. lock (gate)
  741. {
  742. if (q.Count > 0)
  743. {
  744. var s = q.Dequeue();
  745. subscribe(s);
  746. }
  747. else
  748. {
  749. activeCount--;
  750. if (isStopped && activeCount == 0)
  751. observer.OnCompleted();
  752. }
  753. }
  754. });
  755. };
  756. group.Add(sources.Subscribe(
  757. innerSource =>
  758. {
  759. lock (gate)
  760. {
  761. if (activeCount < maxConcurrent)
  762. {
  763. activeCount++;
  764. subscribe(innerSource);
  765. }
  766. else
  767. q.Enqueue(innerSource);
  768. }
  769. },
  770. exception =>
  771. {
  772. lock (gate)
  773. observer.OnError(exception);
  774. },
  775. () =>
  776. {
  777. lock (gate)
  778. {
  779. isStopped = true;
  780. if (activeCount == 0)
  781. observer.OnCompleted();
  782. }
  783. }));
  784. return group;
  785. });
  786. #endif
  787. }
  788. #endregion
  789. #region + OnErrorResumeNext +
  790. public virtual IObservable<TSource> OnErrorResumeNext<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  791. {
  792. return OnErrorResumeNext_<TSource>(new[] { first, second });
  793. }
  794. public virtual IObservable<TSource> OnErrorResumeNext<TSource>(params IObservable<TSource>[] sources)
  795. {
  796. return OnErrorResumeNext_<TSource>(sources);
  797. }
  798. public virtual IObservable<TSource> OnErrorResumeNext<TSource>(IEnumerable<IObservable<TSource>> sources)
  799. {
  800. return OnErrorResumeNext_<TSource>(sources);
  801. }
  802. private static IObservable<TSource> OnErrorResumeNext_<TSource>(IEnumerable<IObservable<TSource>> sources)
  803. {
  804. #if !NO_PERF
  805. return new OnErrorResumeNext<TSource>(sources);
  806. #else
  807. return new AnonymousObservable<TSource>(observer =>
  808. {
  809. var gate = new AsyncLock();
  810. var isDisposed = false;
  811. var e = sources.GetEnumerator();
  812. var subscription = new SerialDisposable();
  813. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>
  814. {
  815. var current = default(IObservable<TSource>);
  816. var hasNext = false;
  817. var ex = default(Exception);
  818. if (!isDisposed)
  819. {
  820. try
  821. {
  822. hasNext = e.MoveNext();
  823. if (hasNext)
  824. current = e.Current;
  825. else
  826. e.Dispose();
  827. }
  828. catch (Exception exception)
  829. {
  830. ex = exception;
  831. e.Dispose();
  832. }
  833. }
  834. else
  835. return;
  836. if (ex != null)
  837. {
  838. observer.OnError(ex);
  839. return;
  840. }
  841. if (!hasNext)
  842. {
  843. observer.OnCompleted();
  844. return;
  845. }
  846. var d = new SingleAssignmentDisposable();
  847. subscription.Disposable = d;
  848. d.Disposable = current.Subscribe(observer.OnNext, exception => self(), self);
  849. }));
  850. return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>
  851. {
  852. e.Dispose();
  853. isDisposed = true;
  854. })));
  855. });
  856. #endif
  857. }
  858. #endregion
  859. #region + SkipUntil +
  860. public virtual IObservable<TSource> SkipUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)
  861. {
  862. #if !NO_PERF
  863. return new SkipUntil<TSource, TOther>(source, other);
  864. #else
  865. return new AnonymousObservable<TSource>(observer =>
  866. {
  867. var sourceSubscription = new SingleAssignmentDisposable();
  868. var otherSubscription = new SingleAssignmentDisposable();
  869. var open = false;
  870. var gate = new object();
  871. sourceSubscription.Disposable = source.Synchronize(gate).Subscribe(
  872. x =>
  873. {
  874. if (open)
  875. observer.OnNext(x);
  876. },
  877. observer.OnError, // BREAKING CHANGE - Error propagation was guarded by "other" source in v1.0.10621 (due to materialization).
  878. () =>
  879. {
  880. if (open)
  881. observer.OnCompleted();
  882. }
  883. );
  884. otherSubscription.Disposable = other.Synchronize(gate).Subscribe(
  885. x =>
  886. {
  887. open = true;
  888. otherSubscription.Dispose();
  889. },
  890. observer.OnError
  891. );
  892. return new CompositeDisposable(sourceSubscription, otherSubscription);
  893. });
  894. #endif
  895. }
  896. #endregion
  897. #region + Switch +
  898. public virtual IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources)
  899. {
  900. return Switch_<TSource>(sources);
  901. }
  902. public virtual IObservable<TSource> Switch<TSource>(IObservable<Task<TSource>> sources)
  903. {
  904. return Switch_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));
  905. }
  906. private IObservable<TSource> Switch_<TSource>(IObservable<IObservable<TSource>> sources)
  907. {
  908. #if !NO_PERF
  909. return new Switch<TSource>(sources);
  910. #else
  911. return new AnonymousObservable<TSource>(observer =>
  912. {
  913. var gate = new object();
  914. var innerSubscription = new SerialDisposable();
  915. var isStopped = false;
  916. var latest = 0UL;
  917. var hasLatest = false;
  918. var subscription = sources.Subscribe(
  919. innerSource =>
  920. {
  921. var id = default(ulong);
  922. lock (gate)
  923. {
  924. id = unchecked(++latest);
  925. hasLatest = true;
  926. }
  927. var d = new SingleAssignmentDisposable();
  928. innerSubscription.Disposable = d;
  929. d.Disposable = innerSource.Subscribe(
  930. x =>
  931. {
  932. lock (gate)
  933. {
  934. if (latest == id)
  935. observer.OnNext(x);
  936. }
  937. },
  938. exception =>
  939. {
  940. lock (gate)
  941. {
  942. if (latest == id)
  943. observer.OnError(exception);
  944. }
  945. },
  946. () =>
  947. {
  948. lock (gate)
  949. {
  950. if (latest == id)
  951. {
  952. hasLatest = false;
  953. if (isStopped)
  954. observer.OnCompleted();
  955. }
  956. }
  957. });
  958. },
  959. exception =>
  960. {
  961. lock (gate)
  962. observer.OnError(exception);
  963. },
  964. () =>
  965. {
  966. lock (gate)
  967. {
  968. isStopped = true;
  969. if (!hasLatest)
  970. observer.OnCompleted();
  971. }
  972. });
  973. return new CompositeDisposable(subscription, innerSubscription);
  974. });
  975. #endif
  976. }
  977. #endregion
  978. #region + TakeUntil +
  979. public virtual IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)
  980. {
  981. #if !NO_PERF
  982. return new TakeUntil<TSource, TOther>(source, other);
  983. #else
  984. return new AnonymousObservable<TSource>(observer =>
  985. {
  986. var sourceSubscription = new SingleAssignmentDisposable();
  987. var otherSubscription = new SingleAssignmentDisposable();
  988. var gate = new object();
  989. // COMPAT - Order of Subscribe calls per v1.0.10621
  990. otherSubscription.Disposable = other.Synchronize(gate).Subscribe(
  991. x =>
  992. {
  993. observer.OnCompleted();
  994. },
  995. observer.OnError
  996. );
  997. sourceSubscription.Disposable = source.Synchronize(gate).Finally(otherSubscription.Dispose).Subscribe(observer);
  998. return new CompositeDisposable(sourceSubscription, otherSubscription);
  999. });
  1000. #endif
  1001. }
  1002. #endregion
  1003. #region + Window +
  1004. public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector)
  1005. {
  1006. #if !NO_PERF
  1007. return new Window<TSource, TWindowClosing>(source, windowClosingSelector);
  1008. #else
  1009. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1010. {
  1011. var window = new Subject<TSource>();
  1012. var gate = new object();
  1013. var m = new SerialDisposable();
  1014. var d = new CompositeDisposable(2) { m };
  1015. var r = new RefCountDisposable(d);
  1016. observer.OnNext(window.AddRef(r));
  1017. d.Add(source.SubscribeSafe(new AnonymousObserver<TSource>(
  1018. x =>
  1019. {
  1020. lock (gate)
  1021. {
  1022. window.OnNext(x);
  1023. }
  1024. },
  1025. ex =>
  1026. {
  1027. lock (gate)
  1028. {
  1029. window.OnError(ex);
  1030. observer.OnError(ex);
  1031. }
  1032. },
  1033. () =>
  1034. {
  1035. lock (gate)
  1036. {
  1037. window.OnCompleted();
  1038. observer.OnCompleted();
  1039. }
  1040. })));
  1041. var l = new AsyncLock();
  1042. Action createWindowClose = null;
  1043. createWindowClose = () =>
  1044. {
  1045. var windowClose = default(IObservable<TWindowClosing>);
  1046. try
  1047. {
  1048. windowClose = windowClosingSelector();
  1049. }
  1050. catch (Exception exception)
  1051. {
  1052. lock (gate)
  1053. {
  1054. observer.OnError(exception);
  1055. }
  1056. return;
  1057. }
  1058. var m1 = new SingleAssignmentDisposable();
  1059. m.Disposable = m1;
  1060. m1.Disposable = windowClose.Take(1).SubscribeSafe(new AnonymousObserver<TWindowClosing>(
  1061. Stubs<TWindowClosing>.Ignore,
  1062. ex =>
  1063. {
  1064. lock (gate)
  1065. {
  1066. window.OnError(ex);
  1067. observer.OnError(ex);
  1068. }
  1069. },
  1070. () =>
  1071. {
  1072. lock (gate)
  1073. {
  1074. window.OnCompleted();
  1075. window = new Subject<TSource>();
  1076. observer.OnNext(window.AddRef(r));
  1077. }
  1078. l.Wait(createWindowClose);
  1079. }));
  1080. };
  1081. l.Wait(createWindowClose);
  1082. return r;
  1083. });
  1084. #endif
  1085. }
  1086. public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector)
  1087. {
  1088. return windowOpenings.GroupJoin(source, windowClosingSelector, _ => Observable.Empty<Unit>(), (_, window) => window);
  1089. }
  1090. public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries)
  1091. {
  1092. #if !NO_PERF
  1093. return new Window<TSource, TWindowBoundary>(source, windowBoundaries);
  1094. #else
  1095. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1096. {
  1097. var window = new Subject<TSource>();
  1098. var gate = new object();
  1099. var d = new CompositeDisposable(2);
  1100. var r = new RefCountDisposable(d);
  1101. observer.OnNext(window.AddRef(r));
  1102. d.Add(source.SubscribeSafe(new AnonymousObserver<TSource>(
  1103. x =>
  1104. {
  1105. lock (gate)
  1106. {
  1107. window.OnNext(x);
  1108. }
  1109. },
  1110. ex =>
  1111. {
  1112. lock (gate)
  1113. {
  1114. window.OnError(ex);
  1115. observer.OnError(ex);
  1116. }
  1117. },
  1118. () =>
  1119. {
  1120. lock (gate)
  1121. {
  1122. window.OnCompleted();
  1123. observer.OnCompleted();
  1124. }
  1125. }
  1126. )));
  1127. d.Add(windowBoundaries.SubscribeSafe(new AnonymousObserver<TWindowBoundary>(
  1128. w =>
  1129. {
  1130. lock (gate)
  1131. {
  1132. window.OnCompleted();
  1133. window = new Subject<TSource>();
  1134. observer.OnNext(window.AddRef(r));
  1135. }
  1136. },
  1137. ex =>
  1138. {
  1139. lock (gate)
  1140. {
  1141. window.OnError(ex);
  1142. observer.OnError(ex);
  1143. }
  1144. },
  1145. () =>
  1146. {
  1147. lock (gate)
  1148. {
  1149. window.OnCompleted();
  1150. observer.OnCompleted();
  1151. }
  1152. }
  1153. )));
  1154. return r;
  1155. });
  1156. #endif
  1157. }
  1158. #endregion
  1159. #region + WithLatestFrom +
  1160. public virtual IObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  1161. {
  1162. return new WithLatestFrom<TFirst, TSecond, TResult>(first, second, resultSelector);
  1163. }
  1164. #endregion
  1165. #region + Zip +
  1166. public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  1167. {
  1168. #if !NO_PERF
  1169. return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);
  1170. #else
  1171. return new AnonymousObservable<TResult>(observer =>
  1172. {
  1173. var queueLeft = new Queue<TFirst>();
  1174. var queueRight = new Queue<TSecond>();
  1175. var leftDone = false;
  1176. var rightDone = false;
  1177. var leftSubscription = new SingleAssignmentDisposable();
  1178. var rightSubscription = new SingleAssignmentDisposable();
  1179. var gate = new object();
  1180. leftSubscription.Disposable = first.Synchronize(gate).Subscribe(
  1181. l =>
  1182. {
  1183. if (queueRight.Count > 0)
  1184. {
  1185. var r = queueRight.Dequeue();
  1186. var res = default(TResult);
  1187. try
  1188. {
  1189. res = resultSelector(l, r);
  1190. }
  1191. catch (Exception ex)
  1192. {
  1193. observer.OnError(ex);
  1194. return;
  1195. }
  1196. observer.OnNext(res);
  1197. }
  1198. else
  1199. {
  1200. if (rightDone)
  1201. {
  1202. observer.OnCompleted();
  1203. return;
  1204. }
  1205. queueLeft.Enqueue(l);
  1206. }
  1207. },
  1208. observer.OnError,
  1209. () =>
  1210. {
  1211. leftDone = true;
  1212. if (rightDone)
  1213. {
  1214. observer.OnCompleted();
  1215. return;
  1216. }
  1217. }
  1218. );
  1219. rightSubscription.Disposable = second.Synchronize(gate).Subscribe(
  1220. r =>
  1221. {
  1222. if (queueLeft.Count > 0)
  1223. {
  1224. var l = queueLeft.Dequeue();
  1225. var res = default(TResult);
  1226. try
  1227. {
  1228. res = resultSelector(l, r);
  1229. }
  1230. catch (Exception ex)
  1231. {
  1232. observer.OnError(ex);
  1233. return;
  1234. }
  1235. observer.OnNext(res);
  1236. }
  1237. else
  1238. {
  1239. if (leftDone)
  1240. {
  1241. observer.OnCompleted();
  1242. return;
  1243. }
  1244. queueRight.Enqueue(r);
  1245. }
  1246. },
  1247. observer.OnError,
  1248. () =>
  1249. {
  1250. rightDone = true;
  1251. if (leftDone)
  1252. {
  1253. observer.OnCompleted();
  1254. return;
  1255. }
  1256. }
  1257. );
  1258. return new CompositeDisposable(leftSubscription, rightSubscription, Disposable.Create(() => { queueLeft.Clear(); queueRight.Clear(); }));
  1259. });
  1260. #endif
  1261. }
  1262. public virtual IObservable<TResult> Zip<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  1263. {
  1264. return Zip_<TSource>(sources).Select(resultSelector);
  1265. }
  1266. public virtual IObservable<IList<TSource>> Zip<TSource>(IEnumerable<IObservable<TSource>> sources)
  1267. {
  1268. return Zip_<TSource>(sources);
  1269. }
  1270. public virtual IObservable<IList<TSource>> Zip<TSource>(params IObservable<TSource>[] sources)
  1271. {
  1272. return Zip_<TSource>(sources);
  1273. }
  1274. private static IObservable<IList<TSource>> Zip_<TSource>(IEnumerable<IObservable<TSource>> sources)
  1275. {
  1276. #if !NO_PERF
  1277. return new Zip<TSource>(sources);
  1278. #else
  1279. return new AnonymousObservable<IList<TSource>>(observer =>
  1280. {
  1281. var srcs = sources.ToArray();
  1282. var N = srcs.Length;
  1283. var queues = new Queue<TSource>[N];
  1284. for (int i = 0; i < N; i++)
  1285. queues[i] = new Queue<TSource>();
  1286. var isDone = new bool[N];
  1287. var next = new Action<int>(i =>
  1288. {
  1289. if (queues.All(q => q.Count > 0))
  1290. {
  1291. var res = queues.Select(q => q.Dequeue()).ToList();
  1292. observer.OnNext(res);
  1293. }
  1294. else if (isDone.Where((x, j) => j != i).All(Stubs<bool>.I))
  1295. {
  1296. observer.OnCompleted();
  1297. return;
  1298. }
  1299. });
  1300. var done = new Action<int>(i =>
  1301. {
  1302. isDone[i] = true;
  1303. if (isDone.All(Stubs<bool>.I))
  1304. {
  1305. observer.OnCompleted();
  1306. return;
  1307. }
  1308. });
  1309. var subscriptions = new SingleAssignmentDisposable[N];
  1310. var gate = new object();
  1311. for (int i = 0; i < N; i++)
  1312. {
  1313. var j = i;
  1314. subscriptions[j] = new SingleAssignmentDisposable
  1315. {
  1316. Disposable = srcs[j].Synchronize(gate).Subscribe(
  1317. x =>
  1318. {
  1319. queues[j].Enqueue(x);
  1320. next(j);
  1321. },
  1322. observer.OnError,
  1323. () =>
  1324. {
  1325. done(j);
  1326. }
  1327. )
  1328. };
  1329. }
  1330. return new CompositeDisposable(subscriptions) { Disposable.Create(() => { foreach (var q in queues) q.Clear(); }) };
  1331. });
  1332. #endif
  1333. }
  1334. #if !NO_PERF
  1335. /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */
  1336. #region Zip auto-generated code (6/10/2012 8:15:28 PM)
  1337. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, Func<TSource1, TSource2, TSource3, TResult> resultSelector)
  1338. {
  1339. return new Zip<TSource1, TSource2, TSource3, TResult>(source1, source2, source3, resultSelector);
  1340. }
  1341. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, Func<TSource1, TSource2, TSource3, TSource4, TResult> resultSelector)
  1342. {
  1343. return new Zip<TSource1, TSource2, TSource3, TSource4, TResult>(source1, source2, source3, source4, resultSelector);
  1344. }
  1345. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TResult> resultSelector)
  1346. {
  1347. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(source1, source2, source3, source4, source5, resultSelector);
  1348. }
  1349. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult> resultSelector)
  1350. {
  1351. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(source1, source2, source3, source4, source5, source6, resultSelector);
  1352. }
  1353. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult> resultSelector)
  1354. {
  1355. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(source1, source2, source3, source4, source5, source6, source7, resultSelector);
  1356. }
  1357. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> resultSelector)
  1358. {
  1359. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector);
  1360. }
  1361. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult> resultSelector)
  1362. {
  1363. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector);
  1364. }
  1365. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult> resultSelector)
  1366. {
  1367. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector);
  1368. }
  1369. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult> resultSelector)
  1370. {
  1371. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector);
  1372. }
  1373. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult> resultSelector)
  1374. {
  1375. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector);
  1376. }
  1377. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult> resultSelector)
  1378. {
  1379. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector);
  1380. }
  1381. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult> resultSelector)
  1382. {
  1383. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector);
  1384. }
  1385. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult> resultSelector)
  1386. {
  1387. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector);
  1388. }
  1389. public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, IObservable<TSource16> source16, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult> resultSelector)
  1390. {
  1391. return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector);
  1392. }
  1393. #endregion
  1394. #endif
  1395. public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  1396. {
  1397. #if !NO_PERF
  1398. return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);
  1399. #else
  1400. return new AnonymousObservable<TResult>(observer =>
  1401. {
  1402. var rightEnumerator = second.GetEnumerator();
  1403. var leftSubscription = first.Subscribe(left =>
  1404. {
  1405. var hasNext = false;
  1406. try
  1407. {
  1408. hasNext = rightEnumerator.MoveNext();
  1409. }
  1410. catch (Exception ex)
  1411. {
  1412. observer.OnError(ex);
  1413. return;
  1414. }
  1415. if (hasNext)
  1416. {
  1417. var right = default(TSecond);
  1418. try
  1419. {
  1420. right = rightEnumerator.Current;
  1421. }
  1422. catch (Exception ex)
  1423. {
  1424. observer.OnError(ex);
  1425. return;
  1426. }
  1427. TResult result;
  1428. try
  1429. {
  1430. result = resultSelector(left, right);
  1431. }
  1432. catch (Exception ex)
  1433. {
  1434. observer.OnError(ex);
  1435. return;
  1436. }
  1437. observer.OnNext(result);
  1438. }
  1439. else
  1440. {
  1441. observer.OnCompleted();
  1442. }
  1443. },
  1444. observer.OnError,
  1445. observer.OnCompleted
  1446. );
  1447. return new CompositeDisposable(leftSubscription, rightEnumerator);
  1448. });
  1449. #endif
  1450. }
  1451. #endregion
  1452. #region |> Helpers <|
  1453. #if NO_PERF
  1454. private static IObservable<TResult> Combine<TLeft, TRight, TResult>(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)
  1455. {
  1456. return new AnonymousObservable<TResult>(observer =>
  1457. {
  1458. var leftSubscription = new SingleAssignmentDisposable();
  1459. var rightSubscription = new SingleAssignmentDisposable();
  1460. var combiner = combinerSelector(observer, leftSubscription, rightSubscription);
  1461. var gate = new object();
  1462. leftSubscription.Disposable = leftSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner);
  1463. rightSubscription.Disposable = rightSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x)).Synchronize(gate).Subscribe(combiner);
  1464. return new CompositeDisposable(leftSubscription, rightSubscription);
  1465. });
  1466. }
  1467. #endif
  1468. #endregion
  1469. }
  1470. }