CombineLatest.cs 85 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Collections.ObjectModel;
  6. using System.Linq;
  7. using System.Reactive.Disposables;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. #region Binary
  11. class CombineLatest<TFirst, TSecond, TResult> : Producer<TResult>
  12. {
  13. private readonly IObservable<TFirst> _first;
  14. private readonly IObservable<TSecond> _second;
  15. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  16. public CombineLatest(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  17. {
  18. _first = first;
  19. _second = second;
  20. _resultSelector = resultSelector;
  21. }
  22. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  23. {
  24. var sink = new _(this, observer, cancel);
  25. setSink(sink);
  26. return sink.Run();
  27. }
  28. class _ : Sink<TResult>
  29. {
  30. private readonly CombineLatest<TFirst, TSecond, TResult> _parent;
  31. public _(CombineLatest<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  32. : base(observer, cancel)
  33. {
  34. _parent = parent;
  35. }
  36. private object _gate;
  37. public IDisposable Run()
  38. {
  39. _gate = new object();
  40. var fstSubscription = new SingleAssignmentDisposable();
  41. var sndSubscription = new SingleAssignmentDisposable();
  42. var fstO = new F(this, fstSubscription);
  43. var sndO = new S(this, sndSubscription);
  44. fstO.Other = sndO;
  45. sndO.Other = fstO;
  46. fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
  47. sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
  48. return new CompositeDisposable(fstSubscription, sndSubscription);
  49. }
  50. class F : IObserver<TFirst>
  51. {
  52. private readonly _ _parent;
  53. private readonly IDisposable _self;
  54. private S _other;
  55. public F(_ parent, IDisposable self)
  56. {
  57. _parent = parent;
  58. _self = self;
  59. }
  60. public S Other { set { _other = value; } }
  61. public bool HasValue { get; private set; }
  62. public TFirst Value { get; private set; }
  63. public bool Done { get; private set; }
  64. public void OnNext(TFirst value)
  65. {
  66. lock (_parent._gate)
  67. {
  68. HasValue = true;
  69. Value = value;
  70. if (_other.HasValue)
  71. {
  72. var res = default(TResult);
  73. try
  74. {
  75. res = _parent._parent._resultSelector(value, _other.Value);
  76. }
  77. catch (Exception ex)
  78. {
  79. _parent._observer.OnError(ex);
  80. _parent.Dispose();
  81. return;
  82. }
  83. _parent._observer.OnNext(res);
  84. }
  85. else if (_other.Done)
  86. {
  87. _parent._observer.OnCompleted();
  88. _parent.Dispose();
  89. return;
  90. }
  91. }
  92. }
  93. public void OnError(Exception error)
  94. {
  95. lock (_parent._gate)
  96. {
  97. _parent._observer.OnError(error);
  98. _parent.Dispose();
  99. }
  100. }
  101. public void OnCompleted()
  102. {
  103. lock (_parent._gate)
  104. {
  105. Done = true;
  106. if (_other.Done)
  107. {
  108. _parent._observer.OnCompleted();
  109. _parent.Dispose();
  110. return;
  111. }
  112. else
  113. {
  114. _self.Dispose();
  115. }
  116. }
  117. }
  118. }
  119. class S : IObserver<TSecond>
  120. {
  121. private readonly _ _parent;
  122. private readonly IDisposable _self;
  123. private F _other;
  124. public S(_ parent, IDisposable self)
  125. {
  126. _parent = parent;
  127. _self = self;
  128. }
  129. public F Other { set { _other = value; } }
  130. public bool HasValue { get; private set; }
  131. public TSecond Value { get; private set; }
  132. public bool Done { get; private set; }
  133. public void OnNext(TSecond value)
  134. {
  135. lock (_parent._gate)
  136. {
  137. HasValue = true;
  138. Value = value;
  139. if (_other.HasValue)
  140. {
  141. var res = default(TResult);
  142. try
  143. {
  144. res = _parent._parent._resultSelector(_other.Value, value);
  145. }
  146. catch (Exception ex)
  147. {
  148. _parent._observer.OnError(ex);
  149. _parent.Dispose();
  150. return;
  151. }
  152. _parent._observer.OnNext(res);
  153. }
  154. else if (_other.Done)
  155. {
  156. _parent._observer.OnCompleted();
  157. _parent.Dispose();
  158. return;
  159. }
  160. }
  161. }
  162. public void OnError(Exception error)
  163. {
  164. lock (_parent._gate)
  165. {
  166. _parent._observer.OnError(error);
  167. _parent.Dispose();
  168. }
  169. }
  170. public void OnCompleted()
  171. {
  172. lock (_parent._gate)
  173. {
  174. Done = true;
  175. if (_other.Done)
  176. {
  177. _parent._observer.OnCompleted();
  178. _parent.Dispose();
  179. return;
  180. }
  181. else
  182. {
  183. _self.Dispose();
  184. }
  185. }
  186. }
  187. }
  188. }
  189. }
  190. #endregion
  191. #region [3,16]-ary
  192. /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */
  193. #region CombineLatest auto-generated code (6/10/2012 7:22:14 PM)
  194. class CombineLatest<T1, T2, T3, TResult> : Producer<TResult>
  195. {
  196. private readonly IObservable<T1> _source1;
  197. private readonly IObservable<T2> _source2;
  198. private readonly IObservable<T3> _source3;
  199. private readonly Func<T1, T2, T3, TResult> _resultSelector;
  200. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, Func<T1, T2, T3, TResult> resultSelector)
  201. {
  202. _source1 = source1;
  203. _source2 = source2;
  204. _source3 = source3;
  205. _resultSelector = resultSelector;
  206. }
  207. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  208. {
  209. var sink = new _(this, observer, cancel);
  210. setSink(sink);
  211. return sink.Run();
  212. }
  213. class _ : CombineLatestSink<TResult>
  214. {
  215. private readonly CombineLatest<T1, T2, T3, TResult> _parent;
  216. public _(CombineLatest<T1, T2, T3, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  217. : base(3, observer, cancel)
  218. {
  219. _parent = parent;
  220. }
  221. private CombineLatestObserver<T1> _observer1;
  222. private CombineLatestObserver<T2> _observer2;
  223. private CombineLatestObserver<T3> _observer3;
  224. public IDisposable Run()
  225. {
  226. var subscriptions = new SingleAssignmentDisposable[3];
  227. for (int i = 0; i < 3; i++)
  228. subscriptions[i] = new SingleAssignmentDisposable();
  229. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  230. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  231. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  232. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  233. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  234. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  235. return new CompositeDisposable(subscriptions);
  236. }
  237. protected override TResult GetResult()
  238. {
  239. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value);
  240. }
  241. }
  242. }
  243. class CombineLatest<T1, T2, T3, T4, TResult> : Producer<TResult>
  244. {
  245. private readonly IObservable<T1> _source1;
  246. private readonly IObservable<T2> _source2;
  247. private readonly IObservable<T3> _source3;
  248. private readonly IObservable<T4> _source4;
  249. private readonly Func<T1, T2, T3, T4, TResult> _resultSelector;
  250. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, Func<T1, T2, T3, T4, TResult> resultSelector)
  251. {
  252. _source1 = source1;
  253. _source2 = source2;
  254. _source3 = source3;
  255. _source4 = source4;
  256. _resultSelector = resultSelector;
  257. }
  258. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  259. {
  260. var sink = new _(this, observer, cancel);
  261. setSink(sink);
  262. return sink.Run();
  263. }
  264. class _ : CombineLatestSink<TResult>
  265. {
  266. private readonly CombineLatest<T1, T2, T3, T4, TResult> _parent;
  267. public _(CombineLatest<T1, T2, T3, T4, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  268. : base(4, observer, cancel)
  269. {
  270. _parent = parent;
  271. }
  272. private CombineLatestObserver<T1> _observer1;
  273. private CombineLatestObserver<T2> _observer2;
  274. private CombineLatestObserver<T3> _observer3;
  275. private CombineLatestObserver<T4> _observer4;
  276. public IDisposable Run()
  277. {
  278. var subscriptions = new SingleAssignmentDisposable[4];
  279. for (int i = 0; i < 4; i++)
  280. subscriptions[i] = new SingleAssignmentDisposable();
  281. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  282. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  283. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  284. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  285. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  286. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  287. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  288. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  289. return new CompositeDisposable(subscriptions);
  290. }
  291. protected override TResult GetResult()
  292. {
  293. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value);
  294. }
  295. }
  296. }
  297. #if !NO_LARGEARITY
  298. class CombineLatest<T1, T2, T3, T4, T5, TResult> : Producer<TResult>
  299. {
  300. private readonly IObservable<T1> _source1;
  301. private readonly IObservable<T2> _source2;
  302. private readonly IObservable<T3> _source3;
  303. private readonly IObservable<T4> _source4;
  304. private readonly IObservable<T5> _source5;
  305. private readonly Func<T1, T2, T3, T4, T5, TResult> _resultSelector;
  306. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, Func<T1, T2, T3, T4, T5, TResult> resultSelector)
  307. {
  308. _source1 = source1;
  309. _source2 = source2;
  310. _source3 = source3;
  311. _source4 = source4;
  312. _source5 = source5;
  313. _resultSelector = resultSelector;
  314. }
  315. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  316. {
  317. var sink = new _(this, observer, cancel);
  318. setSink(sink);
  319. return sink.Run();
  320. }
  321. class _ : CombineLatestSink<TResult>
  322. {
  323. private readonly CombineLatest<T1, T2, T3, T4, T5, TResult> _parent;
  324. public _(CombineLatest<T1, T2, T3, T4, T5, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  325. : base(5, observer, cancel)
  326. {
  327. _parent = parent;
  328. }
  329. private CombineLatestObserver<T1> _observer1;
  330. private CombineLatestObserver<T2> _observer2;
  331. private CombineLatestObserver<T3> _observer3;
  332. private CombineLatestObserver<T4> _observer4;
  333. private CombineLatestObserver<T5> _observer5;
  334. public IDisposable Run()
  335. {
  336. var subscriptions = new SingleAssignmentDisposable[5];
  337. for (int i = 0; i < 5; i++)
  338. subscriptions[i] = new SingleAssignmentDisposable();
  339. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  340. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  341. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  342. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  343. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  344. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  345. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  346. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  347. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  348. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  349. return new CompositeDisposable(subscriptions);
  350. }
  351. protected override TResult GetResult()
  352. {
  353. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value);
  354. }
  355. }
  356. }
  357. class CombineLatest<T1, T2, T3, T4, T5, T6, TResult> : Producer<TResult>
  358. {
  359. private readonly IObservable<T1> _source1;
  360. private readonly IObservable<T2> _source2;
  361. private readonly IObservable<T3> _source3;
  362. private readonly IObservable<T4> _source4;
  363. private readonly IObservable<T5> _source5;
  364. private readonly IObservable<T6> _source6;
  365. private readonly Func<T1, T2, T3, T4, T5, T6, TResult> _resultSelector;
  366. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, Func<T1, T2, T3, T4, T5, T6, TResult> resultSelector)
  367. {
  368. _source1 = source1;
  369. _source2 = source2;
  370. _source3 = source3;
  371. _source4 = source4;
  372. _source5 = source5;
  373. _source6 = source6;
  374. _resultSelector = resultSelector;
  375. }
  376. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  377. {
  378. var sink = new _(this, observer, cancel);
  379. setSink(sink);
  380. return sink.Run();
  381. }
  382. class _ : CombineLatestSink<TResult>
  383. {
  384. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, TResult> _parent;
  385. public _(CombineLatest<T1, T2, T3, T4, T5, T6, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  386. : base(6, observer, cancel)
  387. {
  388. _parent = parent;
  389. }
  390. private CombineLatestObserver<T1> _observer1;
  391. private CombineLatestObserver<T2> _observer2;
  392. private CombineLatestObserver<T3> _observer3;
  393. private CombineLatestObserver<T4> _observer4;
  394. private CombineLatestObserver<T5> _observer5;
  395. private CombineLatestObserver<T6> _observer6;
  396. public IDisposable Run()
  397. {
  398. var subscriptions = new SingleAssignmentDisposable[6];
  399. for (int i = 0; i < 6; i++)
  400. subscriptions[i] = new SingleAssignmentDisposable();
  401. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  402. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  403. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  404. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  405. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  406. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  407. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  408. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  409. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  410. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  411. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  412. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  413. return new CompositeDisposable(subscriptions);
  414. }
  415. protected override TResult GetResult()
  416. {
  417. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value);
  418. }
  419. }
  420. }
  421. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, TResult> : Producer<TResult>
  422. {
  423. private readonly IObservable<T1> _source1;
  424. private readonly IObservable<T2> _source2;
  425. private readonly IObservable<T3> _source3;
  426. private readonly IObservable<T4> _source4;
  427. private readonly IObservable<T5> _source5;
  428. private readonly IObservable<T6> _source6;
  429. private readonly IObservable<T7> _source7;
  430. private readonly Func<T1, T2, T3, T4, T5, T6, T7, TResult> _resultSelector;
  431. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, Func<T1, T2, T3, T4, T5, T6, T7, TResult> resultSelector)
  432. {
  433. _source1 = source1;
  434. _source2 = source2;
  435. _source3 = source3;
  436. _source4 = source4;
  437. _source5 = source5;
  438. _source6 = source6;
  439. _source7 = source7;
  440. _resultSelector = resultSelector;
  441. }
  442. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  443. {
  444. var sink = new _(this, observer, cancel);
  445. setSink(sink);
  446. return sink.Run();
  447. }
  448. class _ : CombineLatestSink<TResult>
  449. {
  450. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, TResult> _parent;
  451. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  452. : base(7, observer, cancel)
  453. {
  454. _parent = parent;
  455. }
  456. private CombineLatestObserver<T1> _observer1;
  457. private CombineLatestObserver<T2> _observer2;
  458. private CombineLatestObserver<T3> _observer3;
  459. private CombineLatestObserver<T4> _observer4;
  460. private CombineLatestObserver<T5> _observer5;
  461. private CombineLatestObserver<T6> _observer6;
  462. private CombineLatestObserver<T7> _observer7;
  463. public IDisposable Run()
  464. {
  465. var subscriptions = new SingleAssignmentDisposable[7];
  466. for (int i = 0; i < 7; i++)
  467. subscriptions[i] = new SingleAssignmentDisposable();
  468. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  469. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  470. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  471. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  472. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  473. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  474. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  475. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  476. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  477. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  478. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  479. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  480. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  481. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  482. return new CompositeDisposable(subscriptions);
  483. }
  484. protected override TResult GetResult()
  485. {
  486. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value);
  487. }
  488. }
  489. }
  490. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, TResult> : Producer<TResult>
  491. {
  492. private readonly IObservable<T1> _source1;
  493. private readonly IObservable<T2> _source2;
  494. private readonly IObservable<T3> _source3;
  495. private readonly IObservable<T4> _source4;
  496. private readonly IObservable<T5> _source5;
  497. private readonly IObservable<T6> _source6;
  498. private readonly IObservable<T7> _source7;
  499. private readonly IObservable<T8> _source8;
  500. private readonly Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> _resultSelector;
  501. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, IObservable<T8> source8, Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> resultSelector)
  502. {
  503. _source1 = source1;
  504. _source2 = source2;
  505. _source3 = source3;
  506. _source4 = source4;
  507. _source5 = source5;
  508. _source6 = source6;
  509. _source7 = source7;
  510. _source8 = source8;
  511. _resultSelector = resultSelector;
  512. }
  513. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  514. {
  515. var sink = new _(this, observer, cancel);
  516. setSink(sink);
  517. return sink.Run();
  518. }
  519. class _ : CombineLatestSink<TResult>
  520. {
  521. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, TResult> _parent;
  522. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  523. : base(8, observer, cancel)
  524. {
  525. _parent = parent;
  526. }
  527. private CombineLatestObserver<T1> _observer1;
  528. private CombineLatestObserver<T2> _observer2;
  529. private CombineLatestObserver<T3> _observer3;
  530. private CombineLatestObserver<T4> _observer4;
  531. private CombineLatestObserver<T5> _observer5;
  532. private CombineLatestObserver<T6> _observer6;
  533. private CombineLatestObserver<T7> _observer7;
  534. private CombineLatestObserver<T8> _observer8;
  535. public IDisposable Run()
  536. {
  537. var subscriptions = new SingleAssignmentDisposable[8];
  538. for (int i = 0; i < 8; i++)
  539. subscriptions[i] = new SingleAssignmentDisposable();
  540. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  541. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  542. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  543. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  544. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  545. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  546. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  547. _observer8 = new CombineLatestObserver<T8>(_gate, this, 7, subscriptions[7]);
  548. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  549. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  550. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  551. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  552. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  553. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  554. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  555. subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
  556. return new CompositeDisposable(subscriptions);
  557. }
  558. protected override TResult GetResult()
  559. {
  560. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value);
  561. }
  562. }
  563. }
  564. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> : Producer<TResult>
  565. {
  566. private readonly IObservable<T1> _source1;
  567. private readonly IObservable<T2> _source2;
  568. private readonly IObservable<T3> _source3;
  569. private readonly IObservable<T4> _source4;
  570. private readonly IObservable<T5> _source5;
  571. private readonly IObservable<T6> _source6;
  572. private readonly IObservable<T7> _source7;
  573. private readonly IObservable<T8> _source8;
  574. private readonly IObservable<T9> _source9;
  575. private readonly Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> _resultSelector;
  576. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, IObservable<T8> source8, IObservable<T9> source9, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> resultSelector)
  577. {
  578. _source1 = source1;
  579. _source2 = source2;
  580. _source3 = source3;
  581. _source4 = source4;
  582. _source5 = source5;
  583. _source6 = source6;
  584. _source7 = source7;
  585. _source8 = source8;
  586. _source9 = source9;
  587. _resultSelector = resultSelector;
  588. }
  589. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  590. {
  591. var sink = new _(this, observer, cancel);
  592. setSink(sink);
  593. return sink.Run();
  594. }
  595. class _ : CombineLatestSink<TResult>
  596. {
  597. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> _parent;
  598. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  599. : base(9, observer, cancel)
  600. {
  601. _parent = parent;
  602. }
  603. private CombineLatestObserver<T1> _observer1;
  604. private CombineLatestObserver<T2> _observer2;
  605. private CombineLatestObserver<T3> _observer3;
  606. private CombineLatestObserver<T4> _observer4;
  607. private CombineLatestObserver<T5> _observer5;
  608. private CombineLatestObserver<T6> _observer6;
  609. private CombineLatestObserver<T7> _observer7;
  610. private CombineLatestObserver<T8> _observer8;
  611. private CombineLatestObserver<T9> _observer9;
  612. public IDisposable Run()
  613. {
  614. var subscriptions = new SingleAssignmentDisposable[9];
  615. for (int i = 0; i < 9; i++)
  616. subscriptions[i] = new SingleAssignmentDisposable();
  617. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  618. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  619. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  620. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  621. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  622. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  623. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  624. _observer8 = new CombineLatestObserver<T8>(_gate, this, 7, subscriptions[7]);
  625. _observer9 = new CombineLatestObserver<T9>(_gate, this, 8, subscriptions[8]);
  626. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  627. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  628. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  629. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  630. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  631. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  632. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  633. subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
  634. subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
  635. return new CompositeDisposable(subscriptions);
  636. }
  637. protected override TResult GetResult()
  638. {
  639. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value);
  640. }
  641. }
  642. }
  643. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> : Producer<TResult>
  644. {
  645. private readonly IObservable<T1> _source1;
  646. private readonly IObservable<T2> _source2;
  647. private readonly IObservable<T3> _source3;
  648. private readonly IObservable<T4> _source4;
  649. private readonly IObservable<T5> _source5;
  650. private readonly IObservable<T6> _source6;
  651. private readonly IObservable<T7> _source7;
  652. private readonly IObservable<T8> _source8;
  653. private readonly IObservable<T9> _source9;
  654. private readonly IObservable<T10> _source10;
  655. private readonly Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> _resultSelector;
  656. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, IObservable<T8> source8, IObservable<T9> source9, IObservable<T10> source10, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> resultSelector)
  657. {
  658. _source1 = source1;
  659. _source2 = source2;
  660. _source3 = source3;
  661. _source4 = source4;
  662. _source5 = source5;
  663. _source6 = source6;
  664. _source7 = source7;
  665. _source8 = source8;
  666. _source9 = source9;
  667. _source10 = source10;
  668. _resultSelector = resultSelector;
  669. }
  670. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  671. {
  672. var sink = new _(this, observer, cancel);
  673. setSink(sink);
  674. return sink.Run();
  675. }
  676. class _ : CombineLatestSink<TResult>
  677. {
  678. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> _parent;
  679. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  680. : base(10, observer, cancel)
  681. {
  682. _parent = parent;
  683. }
  684. private CombineLatestObserver<T1> _observer1;
  685. private CombineLatestObserver<T2> _observer2;
  686. private CombineLatestObserver<T3> _observer3;
  687. private CombineLatestObserver<T4> _observer4;
  688. private CombineLatestObserver<T5> _observer5;
  689. private CombineLatestObserver<T6> _observer6;
  690. private CombineLatestObserver<T7> _observer7;
  691. private CombineLatestObserver<T8> _observer8;
  692. private CombineLatestObserver<T9> _observer9;
  693. private CombineLatestObserver<T10> _observer10;
  694. public IDisposable Run()
  695. {
  696. var subscriptions = new SingleAssignmentDisposable[10];
  697. for (int i = 0; i < 10; i++)
  698. subscriptions[i] = new SingleAssignmentDisposable();
  699. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  700. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  701. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  702. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  703. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  704. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  705. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  706. _observer8 = new CombineLatestObserver<T8>(_gate, this, 7, subscriptions[7]);
  707. _observer9 = new CombineLatestObserver<T9>(_gate, this, 8, subscriptions[8]);
  708. _observer10 = new CombineLatestObserver<T10>(_gate, this, 9, subscriptions[9]);
  709. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  710. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  711. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  712. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  713. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  714. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  715. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  716. subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
  717. subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
  718. subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
  719. return new CompositeDisposable(subscriptions);
  720. }
  721. protected override TResult GetResult()
  722. {
  723. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value);
  724. }
  725. }
  726. }
  727. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> : Producer<TResult>
  728. {
  729. private readonly IObservable<T1> _source1;
  730. private readonly IObservable<T2> _source2;
  731. private readonly IObservable<T3> _source3;
  732. private readonly IObservable<T4> _source4;
  733. private readonly IObservable<T5> _source5;
  734. private readonly IObservable<T6> _source6;
  735. private readonly IObservable<T7> _source7;
  736. private readonly IObservable<T8> _source8;
  737. private readonly IObservable<T9> _source9;
  738. private readonly IObservable<T10> _source10;
  739. private readonly IObservable<T11> _source11;
  740. private readonly Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> _resultSelector;
  741. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, IObservable<T8> source8, IObservable<T9> source9, IObservable<T10> source10, IObservable<T11> source11, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> resultSelector)
  742. {
  743. _source1 = source1;
  744. _source2 = source2;
  745. _source3 = source3;
  746. _source4 = source4;
  747. _source5 = source5;
  748. _source6 = source6;
  749. _source7 = source7;
  750. _source8 = source8;
  751. _source9 = source9;
  752. _source10 = source10;
  753. _source11 = source11;
  754. _resultSelector = resultSelector;
  755. }
  756. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  757. {
  758. var sink = new _(this, observer, cancel);
  759. setSink(sink);
  760. return sink.Run();
  761. }
  762. class _ : CombineLatestSink<TResult>
  763. {
  764. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> _parent;
  765. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  766. : base(11, observer, cancel)
  767. {
  768. _parent = parent;
  769. }
  770. private CombineLatestObserver<T1> _observer1;
  771. private CombineLatestObserver<T2> _observer2;
  772. private CombineLatestObserver<T3> _observer3;
  773. private CombineLatestObserver<T4> _observer4;
  774. private CombineLatestObserver<T5> _observer5;
  775. private CombineLatestObserver<T6> _observer6;
  776. private CombineLatestObserver<T7> _observer7;
  777. private CombineLatestObserver<T8> _observer8;
  778. private CombineLatestObserver<T9> _observer9;
  779. private CombineLatestObserver<T10> _observer10;
  780. private CombineLatestObserver<T11> _observer11;
  781. public IDisposable Run()
  782. {
  783. var subscriptions = new SingleAssignmentDisposable[11];
  784. for (int i = 0; i < 11; i++)
  785. subscriptions[i] = new SingleAssignmentDisposable();
  786. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  787. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  788. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  789. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  790. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  791. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  792. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  793. _observer8 = new CombineLatestObserver<T8>(_gate, this, 7, subscriptions[7]);
  794. _observer9 = new CombineLatestObserver<T9>(_gate, this, 8, subscriptions[8]);
  795. _observer10 = new CombineLatestObserver<T10>(_gate, this, 9, subscriptions[9]);
  796. _observer11 = new CombineLatestObserver<T11>(_gate, this, 10, subscriptions[10]);
  797. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  798. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  799. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  800. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  801. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  802. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  803. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  804. subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
  805. subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
  806. subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
  807. subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
  808. return new CompositeDisposable(subscriptions);
  809. }
  810. protected override TResult GetResult()
  811. {
  812. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value);
  813. }
  814. }
  815. }
  816. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> : Producer<TResult>
  817. {
  818. private readonly IObservable<T1> _source1;
  819. private readonly IObservable<T2> _source2;
  820. private readonly IObservable<T3> _source3;
  821. private readonly IObservable<T4> _source4;
  822. private readonly IObservable<T5> _source5;
  823. private readonly IObservable<T6> _source6;
  824. private readonly IObservable<T7> _source7;
  825. private readonly IObservable<T8> _source8;
  826. private readonly IObservable<T9> _source9;
  827. private readonly IObservable<T10> _source10;
  828. private readonly IObservable<T11> _source11;
  829. private readonly IObservable<T12> _source12;
  830. private readonly Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> _resultSelector;
  831. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, IObservable<T8> source8, IObservable<T9> source9, IObservable<T10> source10, IObservable<T11> source11, IObservable<T12> source12, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> resultSelector)
  832. {
  833. _source1 = source1;
  834. _source2 = source2;
  835. _source3 = source3;
  836. _source4 = source4;
  837. _source5 = source5;
  838. _source6 = source6;
  839. _source7 = source7;
  840. _source8 = source8;
  841. _source9 = source9;
  842. _source10 = source10;
  843. _source11 = source11;
  844. _source12 = source12;
  845. _resultSelector = resultSelector;
  846. }
  847. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  848. {
  849. var sink = new _(this, observer, cancel);
  850. setSink(sink);
  851. return sink.Run();
  852. }
  853. class _ : CombineLatestSink<TResult>
  854. {
  855. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> _parent;
  856. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  857. : base(12, observer, cancel)
  858. {
  859. _parent = parent;
  860. }
  861. private CombineLatestObserver<T1> _observer1;
  862. private CombineLatestObserver<T2> _observer2;
  863. private CombineLatestObserver<T3> _observer3;
  864. private CombineLatestObserver<T4> _observer4;
  865. private CombineLatestObserver<T5> _observer5;
  866. private CombineLatestObserver<T6> _observer6;
  867. private CombineLatestObserver<T7> _observer7;
  868. private CombineLatestObserver<T8> _observer8;
  869. private CombineLatestObserver<T9> _observer9;
  870. private CombineLatestObserver<T10> _observer10;
  871. private CombineLatestObserver<T11> _observer11;
  872. private CombineLatestObserver<T12> _observer12;
  873. public IDisposable Run()
  874. {
  875. var subscriptions = new SingleAssignmentDisposable[12];
  876. for (int i = 0; i < 12; i++)
  877. subscriptions[i] = new SingleAssignmentDisposable();
  878. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  879. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  880. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  881. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  882. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  883. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  884. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  885. _observer8 = new CombineLatestObserver<T8>(_gate, this, 7, subscriptions[7]);
  886. _observer9 = new CombineLatestObserver<T9>(_gate, this, 8, subscriptions[8]);
  887. _observer10 = new CombineLatestObserver<T10>(_gate, this, 9, subscriptions[9]);
  888. _observer11 = new CombineLatestObserver<T11>(_gate, this, 10, subscriptions[10]);
  889. _observer12 = new CombineLatestObserver<T12>(_gate, this, 11, subscriptions[11]);
  890. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  891. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  892. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  893. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  894. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  895. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  896. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  897. subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
  898. subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
  899. subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
  900. subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
  901. subscriptions[11].Disposable = _parent._source12.SubscribeSafe(_observer12);
  902. return new CompositeDisposable(subscriptions);
  903. }
  904. protected override TResult GetResult()
  905. {
  906. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value);
  907. }
  908. }
  909. }
  910. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> : Producer<TResult>
  911. {
  912. private readonly IObservable<T1> _source1;
  913. private readonly IObservable<T2> _source2;
  914. private readonly IObservable<T3> _source3;
  915. private readonly IObservable<T4> _source4;
  916. private readonly IObservable<T5> _source5;
  917. private readonly IObservable<T6> _source6;
  918. private readonly IObservable<T7> _source7;
  919. private readonly IObservable<T8> _source8;
  920. private readonly IObservable<T9> _source9;
  921. private readonly IObservable<T10> _source10;
  922. private readonly IObservable<T11> _source11;
  923. private readonly IObservable<T12> _source12;
  924. private readonly IObservable<T13> _source13;
  925. private readonly Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> _resultSelector;
  926. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, IObservable<T8> source8, IObservable<T9> source9, IObservable<T10> source10, IObservable<T11> source11, IObservable<T12> source12, IObservable<T13> source13, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> resultSelector)
  927. {
  928. _source1 = source1;
  929. _source2 = source2;
  930. _source3 = source3;
  931. _source4 = source4;
  932. _source5 = source5;
  933. _source6 = source6;
  934. _source7 = source7;
  935. _source8 = source8;
  936. _source9 = source9;
  937. _source10 = source10;
  938. _source11 = source11;
  939. _source12 = source12;
  940. _source13 = source13;
  941. _resultSelector = resultSelector;
  942. }
  943. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  944. {
  945. var sink = new _(this, observer, cancel);
  946. setSink(sink);
  947. return sink.Run();
  948. }
  949. class _ : CombineLatestSink<TResult>
  950. {
  951. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> _parent;
  952. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  953. : base(13, observer, cancel)
  954. {
  955. _parent = parent;
  956. }
  957. private CombineLatestObserver<T1> _observer1;
  958. private CombineLatestObserver<T2> _observer2;
  959. private CombineLatestObserver<T3> _observer3;
  960. private CombineLatestObserver<T4> _observer4;
  961. private CombineLatestObserver<T5> _observer5;
  962. private CombineLatestObserver<T6> _observer6;
  963. private CombineLatestObserver<T7> _observer7;
  964. private CombineLatestObserver<T8> _observer8;
  965. private CombineLatestObserver<T9> _observer9;
  966. private CombineLatestObserver<T10> _observer10;
  967. private CombineLatestObserver<T11> _observer11;
  968. private CombineLatestObserver<T12> _observer12;
  969. private CombineLatestObserver<T13> _observer13;
  970. public IDisposable Run()
  971. {
  972. var subscriptions = new SingleAssignmentDisposable[13];
  973. for (int i = 0; i < 13; i++)
  974. subscriptions[i] = new SingleAssignmentDisposable();
  975. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  976. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  977. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  978. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  979. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  980. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  981. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  982. _observer8 = new CombineLatestObserver<T8>(_gate, this, 7, subscriptions[7]);
  983. _observer9 = new CombineLatestObserver<T9>(_gate, this, 8, subscriptions[8]);
  984. _observer10 = new CombineLatestObserver<T10>(_gate, this, 9, subscriptions[9]);
  985. _observer11 = new CombineLatestObserver<T11>(_gate, this, 10, subscriptions[10]);
  986. _observer12 = new CombineLatestObserver<T12>(_gate, this, 11, subscriptions[11]);
  987. _observer13 = new CombineLatestObserver<T13>(_gate, this, 12, subscriptions[12]);
  988. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  989. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  990. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  991. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  992. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  993. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  994. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  995. subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
  996. subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
  997. subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
  998. subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
  999. subscriptions[11].Disposable = _parent._source12.SubscribeSafe(_observer12);
  1000. subscriptions[12].Disposable = _parent._source13.SubscribeSafe(_observer13);
  1001. return new CompositeDisposable(subscriptions);
  1002. }
  1003. protected override TResult GetResult()
  1004. {
  1005. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value);
  1006. }
  1007. }
  1008. }
  1009. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> : Producer<TResult>
  1010. {
  1011. private readonly IObservable<T1> _source1;
  1012. private readonly IObservable<T2> _source2;
  1013. private readonly IObservable<T3> _source3;
  1014. private readonly IObservable<T4> _source4;
  1015. private readonly IObservable<T5> _source5;
  1016. private readonly IObservable<T6> _source6;
  1017. private readonly IObservable<T7> _source7;
  1018. private readonly IObservable<T8> _source8;
  1019. private readonly IObservable<T9> _source9;
  1020. private readonly IObservable<T10> _source10;
  1021. private readonly IObservable<T11> _source11;
  1022. private readonly IObservable<T12> _source12;
  1023. private readonly IObservable<T13> _source13;
  1024. private readonly IObservable<T14> _source14;
  1025. private readonly Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> _resultSelector;
  1026. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, IObservable<T8> source8, IObservable<T9> source9, IObservable<T10> source10, IObservable<T11> source11, IObservable<T12> source12, IObservable<T13> source13, IObservable<T14> source14, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> resultSelector)
  1027. {
  1028. _source1 = source1;
  1029. _source2 = source2;
  1030. _source3 = source3;
  1031. _source4 = source4;
  1032. _source5 = source5;
  1033. _source6 = source6;
  1034. _source7 = source7;
  1035. _source8 = source8;
  1036. _source9 = source9;
  1037. _source10 = source10;
  1038. _source11 = source11;
  1039. _source12 = source12;
  1040. _source13 = source13;
  1041. _source14 = source14;
  1042. _resultSelector = resultSelector;
  1043. }
  1044. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  1045. {
  1046. var sink = new _(this, observer, cancel);
  1047. setSink(sink);
  1048. return sink.Run();
  1049. }
  1050. class _ : CombineLatestSink<TResult>
  1051. {
  1052. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> _parent;
  1053. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  1054. : base(14, observer, cancel)
  1055. {
  1056. _parent = parent;
  1057. }
  1058. private CombineLatestObserver<T1> _observer1;
  1059. private CombineLatestObserver<T2> _observer2;
  1060. private CombineLatestObserver<T3> _observer3;
  1061. private CombineLatestObserver<T4> _observer4;
  1062. private CombineLatestObserver<T5> _observer5;
  1063. private CombineLatestObserver<T6> _observer6;
  1064. private CombineLatestObserver<T7> _observer7;
  1065. private CombineLatestObserver<T8> _observer8;
  1066. private CombineLatestObserver<T9> _observer9;
  1067. private CombineLatestObserver<T10> _observer10;
  1068. private CombineLatestObserver<T11> _observer11;
  1069. private CombineLatestObserver<T12> _observer12;
  1070. private CombineLatestObserver<T13> _observer13;
  1071. private CombineLatestObserver<T14> _observer14;
  1072. public IDisposable Run()
  1073. {
  1074. var subscriptions = new SingleAssignmentDisposable[14];
  1075. for (int i = 0; i < 14; i++)
  1076. subscriptions[i] = new SingleAssignmentDisposable();
  1077. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  1078. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  1079. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  1080. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  1081. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  1082. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  1083. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  1084. _observer8 = new CombineLatestObserver<T8>(_gate, this, 7, subscriptions[7]);
  1085. _observer9 = new CombineLatestObserver<T9>(_gate, this, 8, subscriptions[8]);
  1086. _observer10 = new CombineLatestObserver<T10>(_gate, this, 9, subscriptions[9]);
  1087. _observer11 = new CombineLatestObserver<T11>(_gate, this, 10, subscriptions[10]);
  1088. _observer12 = new CombineLatestObserver<T12>(_gate, this, 11, subscriptions[11]);
  1089. _observer13 = new CombineLatestObserver<T13>(_gate, this, 12, subscriptions[12]);
  1090. _observer14 = new CombineLatestObserver<T14>(_gate, this, 13, subscriptions[13]);
  1091. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  1092. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  1093. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  1094. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  1095. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  1096. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  1097. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  1098. subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
  1099. subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
  1100. subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
  1101. subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
  1102. subscriptions[11].Disposable = _parent._source12.SubscribeSafe(_observer12);
  1103. subscriptions[12].Disposable = _parent._source13.SubscribeSafe(_observer13);
  1104. subscriptions[13].Disposable = _parent._source14.SubscribeSafe(_observer14);
  1105. return new CompositeDisposable(subscriptions);
  1106. }
  1107. protected override TResult GetResult()
  1108. {
  1109. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value, _observer14.Value);
  1110. }
  1111. }
  1112. }
  1113. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> : Producer<TResult>
  1114. {
  1115. private readonly IObservable<T1> _source1;
  1116. private readonly IObservable<T2> _source2;
  1117. private readonly IObservable<T3> _source3;
  1118. private readonly IObservable<T4> _source4;
  1119. private readonly IObservable<T5> _source5;
  1120. private readonly IObservable<T6> _source6;
  1121. private readonly IObservable<T7> _source7;
  1122. private readonly IObservable<T8> _source8;
  1123. private readonly IObservable<T9> _source9;
  1124. private readonly IObservable<T10> _source10;
  1125. private readonly IObservable<T11> _source11;
  1126. private readonly IObservable<T12> _source12;
  1127. private readonly IObservable<T13> _source13;
  1128. private readonly IObservable<T14> _source14;
  1129. private readonly IObservable<T15> _source15;
  1130. private readonly Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> _resultSelector;
  1131. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, IObservable<T8> source8, IObservable<T9> source9, IObservable<T10> source10, IObservable<T11> source11, IObservable<T12> source12, IObservable<T13> source13, IObservable<T14> source14, IObservable<T15> source15, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> resultSelector)
  1132. {
  1133. _source1 = source1;
  1134. _source2 = source2;
  1135. _source3 = source3;
  1136. _source4 = source4;
  1137. _source5 = source5;
  1138. _source6 = source6;
  1139. _source7 = source7;
  1140. _source8 = source8;
  1141. _source9 = source9;
  1142. _source10 = source10;
  1143. _source11 = source11;
  1144. _source12 = source12;
  1145. _source13 = source13;
  1146. _source14 = source14;
  1147. _source15 = source15;
  1148. _resultSelector = resultSelector;
  1149. }
  1150. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  1151. {
  1152. var sink = new _(this, observer, cancel);
  1153. setSink(sink);
  1154. return sink.Run();
  1155. }
  1156. class _ : CombineLatestSink<TResult>
  1157. {
  1158. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> _parent;
  1159. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  1160. : base(15, observer, cancel)
  1161. {
  1162. _parent = parent;
  1163. }
  1164. private CombineLatestObserver<T1> _observer1;
  1165. private CombineLatestObserver<T2> _observer2;
  1166. private CombineLatestObserver<T3> _observer3;
  1167. private CombineLatestObserver<T4> _observer4;
  1168. private CombineLatestObserver<T5> _observer5;
  1169. private CombineLatestObserver<T6> _observer6;
  1170. private CombineLatestObserver<T7> _observer7;
  1171. private CombineLatestObserver<T8> _observer8;
  1172. private CombineLatestObserver<T9> _observer9;
  1173. private CombineLatestObserver<T10> _observer10;
  1174. private CombineLatestObserver<T11> _observer11;
  1175. private CombineLatestObserver<T12> _observer12;
  1176. private CombineLatestObserver<T13> _observer13;
  1177. private CombineLatestObserver<T14> _observer14;
  1178. private CombineLatestObserver<T15> _observer15;
  1179. public IDisposable Run()
  1180. {
  1181. var subscriptions = new SingleAssignmentDisposable[15];
  1182. for (int i = 0; i < 15; i++)
  1183. subscriptions[i] = new SingleAssignmentDisposable();
  1184. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  1185. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  1186. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  1187. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  1188. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  1189. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  1190. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  1191. _observer8 = new CombineLatestObserver<T8>(_gate, this, 7, subscriptions[7]);
  1192. _observer9 = new CombineLatestObserver<T9>(_gate, this, 8, subscriptions[8]);
  1193. _observer10 = new CombineLatestObserver<T10>(_gate, this, 9, subscriptions[9]);
  1194. _observer11 = new CombineLatestObserver<T11>(_gate, this, 10, subscriptions[10]);
  1195. _observer12 = new CombineLatestObserver<T12>(_gate, this, 11, subscriptions[11]);
  1196. _observer13 = new CombineLatestObserver<T13>(_gate, this, 12, subscriptions[12]);
  1197. _observer14 = new CombineLatestObserver<T14>(_gate, this, 13, subscriptions[13]);
  1198. _observer15 = new CombineLatestObserver<T15>(_gate, this, 14, subscriptions[14]);
  1199. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  1200. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  1201. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  1202. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  1203. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  1204. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  1205. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  1206. subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
  1207. subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
  1208. subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
  1209. subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
  1210. subscriptions[11].Disposable = _parent._source12.SubscribeSafe(_observer12);
  1211. subscriptions[12].Disposable = _parent._source13.SubscribeSafe(_observer13);
  1212. subscriptions[13].Disposable = _parent._source14.SubscribeSafe(_observer14);
  1213. subscriptions[14].Disposable = _parent._source15.SubscribeSafe(_observer15);
  1214. return new CompositeDisposable(subscriptions);
  1215. }
  1216. protected override TResult GetResult()
  1217. {
  1218. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value, _observer14.Value, _observer15.Value);
  1219. }
  1220. }
  1221. }
  1222. class CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> : Producer<TResult>
  1223. {
  1224. private readonly IObservable<T1> _source1;
  1225. private readonly IObservable<T2> _source2;
  1226. private readonly IObservable<T3> _source3;
  1227. private readonly IObservable<T4> _source4;
  1228. private readonly IObservable<T5> _source5;
  1229. private readonly IObservable<T6> _source6;
  1230. private readonly IObservable<T7> _source7;
  1231. private readonly IObservable<T8> _source8;
  1232. private readonly IObservable<T9> _source9;
  1233. private readonly IObservable<T10> _source10;
  1234. private readonly IObservable<T11> _source11;
  1235. private readonly IObservable<T12> _source12;
  1236. private readonly IObservable<T13> _source13;
  1237. private readonly IObservable<T14> _source14;
  1238. private readonly IObservable<T15> _source15;
  1239. private readonly IObservable<T16> _source16;
  1240. private readonly Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> _resultSelector;
  1241. public CombineLatest(IObservable<T1> source1, IObservable<T2> source2, IObservable<T3> source3, IObservable<T4> source4, IObservable<T5> source5, IObservable<T6> source6, IObservable<T7> source7, IObservable<T8> source8, IObservable<T9> source9, IObservable<T10> source10, IObservable<T11> source11, IObservable<T12> source12, IObservable<T13> source13, IObservable<T14> source14, IObservable<T15> source15, IObservable<T16> source16, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> resultSelector)
  1242. {
  1243. _source1 = source1;
  1244. _source2 = source2;
  1245. _source3 = source3;
  1246. _source4 = source4;
  1247. _source5 = source5;
  1248. _source6 = source6;
  1249. _source7 = source7;
  1250. _source8 = source8;
  1251. _source9 = source9;
  1252. _source10 = source10;
  1253. _source11 = source11;
  1254. _source12 = source12;
  1255. _source13 = source13;
  1256. _source14 = source14;
  1257. _source15 = source15;
  1258. _source16 = source16;
  1259. _resultSelector = resultSelector;
  1260. }
  1261. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  1262. {
  1263. var sink = new _(this, observer, cancel);
  1264. setSink(sink);
  1265. return sink.Run();
  1266. }
  1267. class _ : CombineLatestSink<TResult>
  1268. {
  1269. private readonly CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> _parent;
  1270. public _(CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  1271. : base(16, observer, cancel)
  1272. {
  1273. _parent = parent;
  1274. }
  1275. private CombineLatestObserver<T1> _observer1;
  1276. private CombineLatestObserver<T2> _observer2;
  1277. private CombineLatestObserver<T3> _observer3;
  1278. private CombineLatestObserver<T4> _observer4;
  1279. private CombineLatestObserver<T5> _observer5;
  1280. private CombineLatestObserver<T6> _observer6;
  1281. private CombineLatestObserver<T7> _observer7;
  1282. private CombineLatestObserver<T8> _observer8;
  1283. private CombineLatestObserver<T9> _observer9;
  1284. private CombineLatestObserver<T10> _observer10;
  1285. private CombineLatestObserver<T11> _observer11;
  1286. private CombineLatestObserver<T12> _observer12;
  1287. private CombineLatestObserver<T13> _observer13;
  1288. private CombineLatestObserver<T14> _observer14;
  1289. private CombineLatestObserver<T15> _observer15;
  1290. private CombineLatestObserver<T16> _observer16;
  1291. public IDisposable Run()
  1292. {
  1293. var subscriptions = new SingleAssignmentDisposable[16];
  1294. for (int i = 0; i < 16; i++)
  1295. subscriptions[i] = new SingleAssignmentDisposable();
  1296. _observer1 = new CombineLatestObserver<T1>(_gate, this, 0, subscriptions[0]);
  1297. _observer2 = new CombineLatestObserver<T2>(_gate, this, 1, subscriptions[1]);
  1298. _observer3 = new CombineLatestObserver<T3>(_gate, this, 2, subscriptions[2]);
  1299. _observer4 = new CombineLatestObserver<T4>(_gate, this, 3, subscriptions[3]);
  1300. _observer5 = new CombineLatestObserver<T5>(_gate, this, 4, subscriptions[4]);
  1301. _observer6 = new CombineLatestObserver<T6>(_gate, this, 5, subscriptions[5]);
  1302. _observer7 = new CombineLatestObserver<T7>(_gate, this, 6, subscriptions[6]);
  1303. _observer8 = new CombineLatestObserver<T8>(_gate, this, 7, subscriptions[7]);
  1304. _observer9 = new CombineLatestObserver<T9>(_gate, this, 8, subscriptions[8]);
  1305. _observer10 = new CombineLatestObserver<T10>(_gate, this, 9, subscriptions[9]);
  1306. _observer11 = new CombineLatestObserver<T11>(_gate, this, 10, subscriptions[10]);
  1307. _observer12 = new CombineLatestObserver<T12>(_gate, this, 11, subscriptions[11]);
  1308. _observer13 = new CombineLatestObserver<T13>(_gate, this, 12, subscriptions[12]);
  1309. _observer14 = new CombineLatestObserver<T14>(_gate, this, 13, subscriptions[13]);
  1310. _observer15 = new CombineLatestObserver<T15>(_gate, this, 14, subscriptions[14]);
  1311. _observer16 = new CombineLatestObserver<T16>(_gate, this, 15, subscriptions[15]);
  1312. subscriptions[0].Disposable = _parent._source1.SubscribeSafe(_observer1);
  1313. subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
  1314. subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
  1315. subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
  1316. subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
  1317. subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
  1318. subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
  1319. subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
  1320. subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
  1321. subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
  1322. subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
  1323. subscriptions[11].Disposable = _parent._source12.SubscribeSafe(_observer12);
  1324. subscriptions[12].Disposable = _parent._source13.SubscribeSafe(_observer13);
  1325. subscriptions[13].Disposable = _parent._source14.SubscribeSafe(_observer14);
  1326. subscriptions[14].Disposable = _parent._source15.SubscribeSafe(_observer15);
  1327. subscriptions[15].Disposable = _parent._source16.SubscribeSafe(_observer16);
  1328. return new CompositeDisposable(subscriptions);
  1329. }
  1330. protected override TResult GetResult()
  1331. {
  1332. return _parent._resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value, _observer14.Value, _observer15.Value, _observer16.Value);
  1333. }
  1334. }
  1335. }
  1336. #endif
  1337. #endregion
  1338. #region Helpers for n-ary overloads
  1339. interface ICombineLatest
  1340. {
  1341. void Next(int index);
  1342. void Fail(Exception error);
  1343. void Done(int index);
  1344. }
  1345. abstract class CombineLatestSink<TResult> : Sink<TResult>, ICombineLatest
  1346. {
  1347. protected readonly object _gate;
  1348. private bool _hasValueAll;
  1349. private readonly bool[] _hasValue;
  1350. private readonly bool[] _isDone;
  1351. public CombineLatestSink(int arity, IObserver<TResult> observer, IDisposable cancel)
  1352. : base(observer, cancel)
  1353. {
  1354. _gate = new object();
  1355. _hasValue = new bool[arity];
  1356. _isDone = new bool[arity];
  1357. }
  1358. public void Next(int index)
  1359. {
  1360. if (!_hasValueAll)
  1361. {
  1362. _hasValue[index] = true;
  1363. var hasValueAll = true;
  1364. foreach (var hasValue in _hasValue)
  1365. {
  1366. if (!hasValue)
  1367. {
  1368. hasValueAll = false;
  1369. break;
  1370. }
  1371. }
  1372. _hasValueAll = hasValueAll;
  1373. }
  1374. if (_hasValueAll)
  1375. {
  1376. var res = default(TResult);
  1377. try
  1378. {
  1379. res = GetResult();
  1380. }
  1381. catch (Exception ex)
  1382. {
  1383. base._observer.OnError(ex);
  1384. base.Dispose();
  1385. return;
  1386. }
  1387. base._observer.OnNext(res);
  1388. }
  1389. else
  1390. {
  1391. var allOthersDone = true;
  1392. for (int i = 0; i < _isDone.Length; i++)
  1393. {
  1394. if (i != index && !_isDone[i])
  1395. {
  1396. allOthersDone = false;
  1397. break;
  1398. }
  1399. }
  1400. if (allOthersDone)
  1401. {
  1402. base._observer.OnCompleted();
  1403. base.Dispose();
  1404. }
  1405. }
  1406. }
  1407. protected abstract TResult GetResult();
  1408. public void Fail(Exception error)
  1409. {
  1410. base._observer.OnError(error);
  1411. base.Dispose();
  1412. }
  1413. public void Done(int index)
  1414. {
  1415. _isDone[index] = true;
  1416. var allDone = true;
  1417. foreach (var isDone in _isDone)
  1418. {
  1419. if (!isDone)
  1420. {
  1421. allDone = false;
  1422. break;
  1423. }
  1424. }
  1425. if (allDone)
  1426. {
  1427. base._observer.OnCompleted();
  1428. base.Dispose();
  1429. return;
  1430. }
  1431. }
  1432. }
  1433. class CombineLatestObserver<T> : IObserver<T>
  1434. {
  1435. private readonly object _gate;
  1436. private readonly ICombineLatest _parent;
  1437. private readonly int _index;
  1438. private readonly IDisposable _self;
  1439. private T _value;
  1440. public CombineLatestObserver(object gate, ICombineLatest parent, int index, IDisposable self)
  1441. {
  1442. _gate = gate;
  1443. _parent = parent;
  1444. _index = index;
  1445. _self = self;
  1446. }
  1447. public T Value
  1448. {
  1449. get { return _value; }
  1450. }
  1451. public void OnNext(T value)
  1452. {
  1453. lock (_gate)
  1454. {
  1455. _value = value;
  1456. _parent.Next(_index);
  1457. }
  1458. }
  1459. public void OnError(Exception error)
  1460. {
  1461. _self.Dispose();
  1462. lock (_gate)
  1463. {
  1464. _parent.Fail(error);
  1465. }
  1466. }
  1467. public void OnCompleted()
  1468. {
  1469. _self.Dispose();
  1470. lock (_gate)
  1471. {
  1472. _parent.Done(_index);
  1473. }
  1474. }
  1475. }
  1476. #endregion
  1477. #endregion
  1478. #region N-ary
  1479. class CombineLatest<TSource, TResult> : Producer<TResult>
  1480. {
  1481. private readonly IEnumerable<IObservable<TSource>> _sources;
  1482. private readonly Func<IList<TSource>, TResult> _resultSelector;
  1483. public CombineLatest(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  1484. {
  1485. _sources = sources;
  1486. _resultSelector = resultSelector;
  1487. }
  1488. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  1489. {
  1490. var sink = new _(this, observer, cancel);
  1491. setSink(sink);
  1492. return sink.Run();
  1493. }
  1494. class _ : Sink<TResult>
  1495. {
  1496. private readonly CombineLatest<TSource, TResult> _parent;
  1497. public _(CombineLatest<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  1498. : base(observer, cancel)
  1499. {
  1500. _parent = parent;
  1501. }
  1502. private object _gate;
  1503. private bool[] _hasValue;
  1504. private bool _hasValueAll;
  1505. private List<TSource> _values;
  1506. private bool[] _isDone;
  1507. private IDisposable[] _subscriptions;
  1508. public IDisposable Run()
  1509. {
  1510. var srcs = _parent._sources.ToArray();
  1511. var N = srcs.Length;
  1512. _hasValue = new bool[N];
  1513. _hasValueAll = false;
  1514. _values = new List<TSource>(N);
  1515. for (int i = 0; i < N; i++)
  1516. _values.Add(default(TSource));
  1517. _isDone = new bool[N];
  1518. _subscriptions = new IDisposable[N];
  1519. _gate = new object();
  1520. for (int i = 0; i < N; i++)
  1521. {
  1522. var j = i;
  1523. var d = new SingleAssignmentDisposable();
  1524. _subscriptions[j] = d;
  1525. var o = new O(this, j);
  1526. d.Disposable = srcs[j].SubscribeSafe(o);
  1527. }
  1528. return new CompositeDisposable(_subscriptions);
  1529. }
  1530. private void OnNext(int index, TSource value)
  1531. {
  1532. lock (_gate)
  1533. {
  1534. _values[index] = value;
  1535. _hasValue[index] = true;
  1536. if (_hasValueAll || (_hasValueAll = _hasValue.All(Stubs<bool>.I)))
  1537. {
  1538. var res = default(TResult);
  1539. try
  1540. {
  1541. res = _parent._resultSelector(new ReadOnlyCollection<TSource>(_values));
  1542. }
  1543. catch (Exception ex)
  1544. {
  1545. base._observer.OnError(ex);
  1546. base.Dispose();
  1547. return;
  1548. }
  1549. _observer.OnNext(res);
  1550. }
  1551. else if (_isDone.Where((x, i) => i != index).All(Stubs<bool>.I))
  1552. {
  1553. base._observer.OnCompleted();
  1554. base.Dispose();
  1555. return;
  1556. }
  1557. }
  1558. }
  1559. private void OnError(Exception error)
  1560. {
  1561. lock (_gate)
  1562. {
  1563. base._observer.OnError(error);
  1564. base.Dispose();
  1565. }
  1566. }
  1567. private void OnCompleted(int index)
  1568. {
  1569. lock (_gate)
  1570. {
  1571. _isDone[index] = true;
  1572. if (_isDone.All(Stubs<bool>.I))
  1573. {
  1574. base._observer.OnCompleted();
  1575. base.Dispose();
  1576. return;
  1577. }
  1578. else
  1579. {
  1580. _subscriptions[index].Dispose();
  1581. }
  1582. }
  1583. }
  1584. class O : IObserver<TSource>
  1585. {
  1586. private readonly _ _parent;
  1587. private readonly int _index;
  1588. public O(_ parent, int index)
  1589. {
  1590. _parent = parent;
  1591. _index = index;
  1592. }
  1593. public void OnNext(TSource value)
  1594. {
  1595. _parent.OnNext(_index, value);
  1596. }
  1597. public void OnError(Exception error)
  1598. {
  1599. _parent.OnError(error);
  1600. }
  1601. public void OnCompleted()
  1602. {
  1603. _parent.OnCompleted(_index);
  1604. }
  1605. }
  1606. }
  1607. }
  1608. #endregion
  1609. }
  1610. #endif