Plan.cs 61 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. namespace System.Reactive.Joins
  6. {
  7. /// <summary>
  8. /// Represents an execution plan for join patterns.
  9. /// </summary>
  10. /// <typeparam name="TResult">The type of the results produced by the plan.</typeparam>
  11. public abstract class Plan<TResult>
  12. {
  13. internal Plan()
  14. {
  15. }
  16. internal abstract ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  17. IObserver<TResult> observer, Action<ActivePlan> deactivate);
  18. internal static JoinObserver<TSource> CreateObserver<TSource>(
  19. Dictionary<object, IJoinObserver> externalSubscriptions, IObservable<TSource> observable, Action<Exception> onError)
  20. {
  21. var observer = default(JoinObserver<TSource>);
  22. var nonGeneric = default(IJoinObserver);
  23. if (!externalSubscriptions.TryGetValue(observable, out nonGeneric))
  24. {
  25. observer = new JoinObserver<TSource>(observable, onError);
  26. externalSubscriptions.Add(observable, observer);
  27. }
  28. else
  29. {
  30. observer = (JoinObserver<TSource>) nonGeneric;
  31. }
  32. return observer;
  33. }
  34. }
  35. internal class Plan<T1, TResult> : Plan<TResult>
  36. {
  37. internal Pattern<T1> Expression { get; private set; }
  38. internal Func<T1, TResult> Selector { get; private set; }
  39. internal Plan(Pattern<T1> expression, Func<T1, TResult> selector)
  40. {
  41. Expression = expression;
  42. Selector = selector;
  43. }
  44. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  45. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  46. {
  47. var onError = new Action<Exception>(observer.OnError);
  48. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  49. var activePlan = default(ActivePlan<T1>);
  50. activePlan = new ActivePlan<T1>(firstJoinObserver,
  51. first =>
  52. {
  53. var result = default(TResult);
  54. try
  55. {
  56. result = Selector(first);
  57. }
  58. catch (Exception exception)
  59. {
  60. observer.OnError(exception);
  61. return;
  62. }
  63. observer.OnNext(result);
  64. },
  65. () =>
  66. {
  67. firstJoinObserver.RemoveActivePlan(activePlan);
  68. deactivate(activePlan);
  69. });
  70. firstJoinObserver.AddActivePlan(activePlan);
  71. return activePlan;
  72. }
  73. }
  74. internal class Plan<T1, T2, TResult> : Plan<TResult>
  75. {
  76. internal Pattern<T1, T2> Expression { get; private set; }
  77. internal Func<T1, T2, TResult> Selector { get; private set; }
  78. internal Plan(Pattern<T1, T2> expression, Func<T1, T2, TResult> selector)
  79. {
  80. Expression = expression;
  81. Selector = selector;
  82. }
  83. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  84. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  85. {
  86. var onError = new Action<Exception>(observer.OnError);
  87. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  88. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  89. var activePlan = default(ActivePlan<T1, T2>);
  90. activePlan = new ActivePlan<T1, T2>(firstJoinObserver, secondJoinObserver,
  91. (first, second) =>
  92. {
  93. var result = default(TResult);
  94. try
  95. {
  96. result = Selector(first, second);
  97. }
  98. catch (Exception exception)
  99. {
  100. observer.OnError(exception);
  101. return;
  102. }
  103. observer.OnNext(result);
  104. },
  105. () =>
  106. {
  107. firstJoinObserver.RemoveActivePlan(activePlan);
  108. secondJoinObserver.RemoveActivePlan(activePlan);
  109. deactivate(activePlan);
  110. });
  111. firstJoinObserver.AddActivePlan(activePlan);
  112. secondJoinObserver.AddActivePlan(activePlan);
  113. return activePlan;
  114. }
  115. }
  116. internal class Plan<T1, T2, T3, TResult> : Plan<TResult>
  117. {
  118. internal Pattern<T1, T2, T3> Expression { get; private set; }
  119. internal Func<T1, T2, T3, TResult> Selector { get; private set; }
  120. internal Plan(Pattern<T1, T2, T3> expression, Func<T1, T2, T3, TResult> selector)
  121. {
  122. Expression = expression;
  123. Selector = selector;
  124. }
  125. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  126. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  127. {
  128. var onError = new Action<Exception>(observer.OnError);
  129. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  130. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  131. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  132. var activePlan = default(ActivePlan<T1, T2, T3>);
  133. activePlan = new ActivePlan<T1, T2, T3>(firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  134. (first, second, third) =>
  135. {
  136. var result = default(TResult);
  137. try
  138. {
  139. result = Selector(first, second, third);
  140. }
  141. catch (Exception exception)
  142. {
  143. observer.OnError(exception);
  144. return;
  145. }
  146. observer.OnNext(result);
  147. },
  148. () =>
  149. {
  150. firstJoinObserver.RemoveActivePlan(activePlan);
  151. secondJoinObserver.RemoveActivePlan(activePlan);
  152. thirdJoinObserver.RemoveActivePlan(activePlan);
  153. deactivate(activePlan);
  154. });
  155. firstJoinObserver.AddActivePlan(activePlan);
  156. secondJoinObserver.AddActivePlan(activePlan);
  157. thirdJoinObserver.AddActivePlan(activePlan);
  158. return activePlan;
  159. }
  160. }
  161. internal class Plan<T1, T2, T3, T4, TResult> : Plan<TResult>
  162. {
  163. internal Pattern<T1, T2, T3, T4> Expression { get; private set; }
  164. internal Func<T1, T2, T3, T4, TResult> Selector { get; private set; }
  165. internal Plan(Pattern<T1, T2, T3, T4> expression,
  166. Func<T1, T2, T3, T4, TResult> selector)
  167. {
  168. Expression = expression;
  169. Selector = selector;
  170. }
  171. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  172. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  173. {
  174. var onError = new Action<Exception>(observer.OnError);
  175. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  176. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  177. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  178. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  179. var activePlan = default(ActivePlan<T1, T2, T3, T4>);
  180. activePlan = new ActivePlan<T1, T2, T3, T4>(firstJoinObserver, secondJoinObserver, thirdJoinObserver, fourthJoinObserver,
  181. (first, second, third, fourth) =>
  182. {
  183. var result = default(TResult);
  184. try
  185. {
  186. result = Selector(first, second, third, fourth);
  187. }
  188. catch (Exception exception)
  189. {
  190. observer.OnError(exception);
  191. return;
  192. }
  193. observer.OnNext(result);
  194. },
  195. () =>
  196. {
  197. firstJoinObserver.RemoveActivePlan(activePlan);
  198. secondJoinObserver.RemoveActivePlan(activePlan);
  199. thirdJoinObserver.RemoveActivePlan(activePlan);
  200. fourthJoinObserver.RemoveActivePlan(activePlan);
  201. deactivate(activePlan);
  202. });
  203. firstJoinObserver.AddActivePlan(activePlan);
  204. secondJoinObserver.AddActivePlan(activePlan);
  205. thirdJoinObserver.AddActivePlan(activePlan);
  206. fourthJoinObserver.AddActivePlan(activePlan);
  207. return activePlan;
  208. }
  209. }
  210. #if !NO_LARGEARITY
  211. internal class Plan<T1, T2, T3, T4, T5, TResult> : Plan<TResult>
  212. {
  213. internal Pattern<T1, T2, T3, T4, T5> Expression { get; private set; }
  214. internal Func<T1, T2, T3, T4, T5, TResult> Selector { get; private set; }
  215. internal Plan(Pattern<T1, T2, T3, T4, T5> expression,
  216. Func<T1, T2, T3, T4, T5, TResult> selector)
  217. {
  218. Expression = expression;
  219. Selector = selector;
  220. }
  221. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  222. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  223. {
  224. var onError = new Action<Exception>(observer.OnError);
  225. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  226. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  227. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  228. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  229. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  230. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5>);
  231. activePlan = new ActivePlan<T1, T2, T3, T4, T5>(firstJoinObserver, secondJoinObserver, thirdJoinObserver, fourthJoinObserver, fifthJoinObserver,
  232. (first, second, third, fourth, fifth) =>
  233. {
  234. var result = default(TResult);
  235. try
  236. {
  237. result = Selector(first, second, third, fourth, fifth);
  238. }
  239. catch (Exception exception)
  240. {
  241. observer.OnError(exception);
  242. return;
  243. }
  244. observer.OnNext(result);
  245. },
  246. () =>
  247. {
  248. firstJoinObserver.RemoveActivePlan(activePlan);
  249. secondJoinObserver.RemoveActivePlan(activePlan);
  250. thirdJoinObserver.RemoveActivePlan(activePlan);
  251. fourthJoinObserver.RemoveActivePlan(activePlan);
  252. fifthJoinObserver.RemoveActivePlan(activePlan);
  253. deactivate(activePlan);
  254. });
  255. firstJoinObserver.AddActivePlan(activePlan);
  256. secondJoinObserver.AddActivePlan(activePlan);
  257. thirdJoinObserver.AddActivePlan(activePlan);
  258. fourthJoinObserver.AddActivePlan(activePlan);
  259. fifthJoinObserver.AddActivePlan(activePlan);
  260. return activePlan;
  261. }
  262. }
  263. internal class Plan<T1, T2, T3, T4, T5, T6, TResult> : Plan<TResult>
  264. {
  265. internal Pattern<T1, T2, T3, T4, T5, T6> Expression { get; private set; }
  266. internal Func<T1, T2, T3, T4, T5, T6, TResult> Selector { get; private set; }
  267. internal Plan(Pattern<T1, T2, T3, T4, T5, T6> expression,
  268. Func<T1, T2, T3, T4, T5, T6, TResult> selector)
  269. {
  270. Expression = expression;
  271. Selector = selector;
  272. }
  273. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  274. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  275. {
  276. var onError = new Action<Exception>(observer.OnError);
  277. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  278. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  279. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  280. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  281. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  282. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  283. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6>);
  284. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6>(firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  285. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver,
  286. (first, second, third, fourth, fifth, sixth) =>
  287. {
  288. var result = default(TResult);
  289. try
  290. {
  291. result = Selector(first, second, third, fourth, fifth, sixth);
  292. }
  293. catch (Exception exception)
  294. {
  295. observer.OnError(exception);
  296. return;
  297. }
  298. observer.OnNext(result);
  299. },
  300. () =>
  301. {
  302. firstJoinObserver.RemoveActivePlan(activePlan);
  303. secondJoinObserver.RemoveActivePlan(activePlan);
  304. thirdJoinObserver.RemoveActivePlan(activePlan);
  305. fourthJoinObserver.RemoveActivePlan(activePlan);
  306. fifthJoinObserver.RemoveActivePlan(activePlan);
  307. sixthJoinObserver.RemoveActivePlan(activePlan);
  308. deactivate(activePlan);
  309. });
  310. firstJoinObserver.AddActivePlan(activePlan);
  311. secondJoinObserver.AddActivePlan(activePlan);
  312. thirdJoinObserver.AddActivePlan(activePlan);
  313. fourthJoinObserver.AddActivePlan(activePlan);
  314. fifthJoinObserver.AddActivePlan(activePlan);
  315. sixthJoinObserver.AddActivePlan(activePlan);
  316. return activePlan;
  317. }
  318. }
  319. internal class Plan<T1, T2, T3, T4, T5, T6, T7, TResult> : Plan<TResult>
  320. {
  321. internal Pattern<T1, T2, T3, T4, T5, T6, T7> Expression { get; private set; }
  322. internal Func<T1, T2, T3, T4, T5, T6, T7, TResult> Selector { get; private set; }
  323. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7> expression,
  324. Func<T1, T2, T3, T4, T5, T6, T7, TResult> selector)
  325. {
  326. Expression = expression;
  327. Selector = selector;
  328. }
  329. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  330. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  331. {
  332. var onError = new Action<Exception>(observer.OnError);
  333. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  334. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  335. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  336. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  337. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  338. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  339. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  340. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7>);
  341. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7>(firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  342. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver, seventhJoinObserver,
  343. (first, second, third, fourth, fifth, sixth, seventh) =>
  344. {
  345. var result = default(TResult);
  346. try
  347. {
  348. result = Selector(first, second, third, fourth, fifth, sixth, seventh);
  349. }
  350. catch (Exception exception)
  351. {
  352. observer.OnError(exception);
  353. return;
  354. }
  355. observer.OnNext(result);
  356. },
  357. () =>
  358. {
  359. firstJoinObserver.RemoveActivePlan(activePlan);
  360. secondJoinObserver.RemoveActivePlan(activePlan);
  361. thirdJoinObserver.RemoveActivePlan(activePlan);
  362. fourthJoinObserver.RemoveActivePlan(activePlan);
  363. fifthJoinObserver.RemoveActivePlan(activePlan);
  364. sixthJoinObserver.RemoveActivePlan(activePlan);
  365. seventhJoinObserver.RemoveActivePlan(activePlan);
  366. deactivate(activePlan);
  367. });
  368. firstJoinObserver.AddActivePlan(activePlan);
  369. secondJoinObserver.AddActivePlan(activePlan);
  370. thirdJoinObserver.AddActivePlan(activePlan);
  371. fourthJoinObserver.AddActivePlan(activePlan);
  372. fifthJoinObserver.AddActivePlan(activePlan);
  373. sixthJoinObserver.AddActivePlan(activePlan);
  374. seventhJoinObserver.AddActivePlan(activePlan);
  375. return activePlan;
  376. }
  377. }
  378. internal class Plan<T1, T2, T3, T4, T5, T6, T7, T8, TResult> : Plan<TResult>
  379. {
  380. internal Pattern<T1, T2, T3, T4, T5, T6, T7, T8> Expression { get; private set; }
  381. internal Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> Selector { get; private set; }
  382. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7, T8> expression,
  383. Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> selector)
  384. {
  385. Expression = expression;
  386. Selector = selector;
  387. }
  388. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  389. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  390. {
  391. var onError = new Action<Exception>(observer.OnError);
  392. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  393. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  394. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  395. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  396. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  397. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  398. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  399. var eighthJoinObserver = CreateObserver(externalSubscriptions, Expression.Eighth, onError);
  400. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8>);
  401. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8>(firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  402. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver, seventhJoinObserver, eighthJoinObserver,
  403. (first, second, third, fourth, fifth, sixth, seventh, eighth) =>
  404. {
  405. var result = default(TResult);
  406. try
  407. {
  408. result = Selector(first, second, third, fourth, fifth, sixth, seventh, eighth);
  409. }
  410. catch (Exception exception)
  411. {
  412. observer.OnError(exception);
  413. return;
  414. }
  415. observer.OnNext(result);
  416. },
  417. () =>
  418. {
  419. firstJoinObserver.RemoveActivePlan(activePlan);
  420. secondJoinObserver.RemoveActivePlan(activePlan);
  421. thirdJoinObserver.RemoveActivePlan(activePlan);
  422. fourthJoinObserver.RemoveActivePlan(activePlan);
  423. fifthJoinObserver.RemoveActivePlan(activePlan);
  424. sixthJoinObserver.RemoveActivePlan(activePlan);
  425. seventhJoinObserver.RemoveActivePlan(activePlan);
  426. eighthJoinObserver.RemoveActivePlan(activePlan);
  427. deactivate(activePlan);
  428. });
  429. firstJoinObserver.AddActivePlan(activePlan);
  430. secondJoinObserver.AddActivePlan(activePlan);
  431. thirdJoinObserver.AddActivePlan(activePlan);
  432. fourthJoinObserver.AddActivePlan(activePlan);
  433. fifthJoinObserver.AddActivePlan(activePlan);
  434. sixthJoinObserver.AddActivePlan(activePlan);
  435. seventhJoinObserver.AddActivePlan(activePlan);
  436. eighthJoinObserver.AddActivePlan(activePlan);
  437. return activePlan;
  438. }
  439. }
  440. internal class Plan<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> : Plan<TResult>
  441. {
  442. internal Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9> Expression { get; private set; }
  443. internal Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> Selector { get; private set; }
  444. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9> expression,
  445. Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> selector)
  446. {
  447. Expression = expression;
  448. Selector = selector;
  449. }
  450. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  451. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  452. {
  453. var onError = new Action<Exception>(observer.OnError);
  454. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  455. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  456. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  457. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  458. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  459. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  460. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  461. var eighthJoinObserver = CreateObserver(externalSubscriptions, Expression.Eighth, onError);
  462. var ninthJoinObserver = CreateObserver(externalSubscriptions, Expression.Ninth, onError);
  463. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9>);
  464. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9>(firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  465. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver, seventhJoinObserver, eighthJoinObserver, ninthJoinObserver,
  466. (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth) =>
  467. {
  468. var result = default(TResult);
  469. try
  470. {
  471. result = Selector(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth);
  472. }
  473. catch (Exception exception)
  474. {
  475. observer.OnError(exception);
  476. return;
  477. }
  478. observer.OnNext(result);
  479. },
  480. () =>
  481. {
  482. firstJoinObserver.RemoveActivePlan(activePlan);
  483. secondJoinObserver.RemoveActivePlan(activePlan);
  484. thirdJoinObserver.RemoveActivePlan(activePlan);
  485. fourthJoinObserver.RemoveActivePlan(activePlan);
  486. fifthJoinObserver.RemoveActivePlan(activePlan);
  487. sixthJoinObserver.RemoveActivePlan(activePlan);
  488. seventhJoinObserver.RemoveActivePlan(activePlan);
  489. eighthJoinObserver.RemoveActivePlan(activePlan);
  490. ninthJoinObserver.RemoveActivePlan(activePlan);
  491. deactivate(activePlan);
  492. });
  493. firstJoinObserver.AddActivePlan(activePlan);
  494. secondJoinObserver.AddActivePlan(activePlan);
  495. thirdJoinObserver.AddActivePlan(activePlan);
  496. fourthJoinObserver.AddActivePlan(activePlan);
  497. fifthJoinObserver.AddActivePlan(activePlan);
  498. sixthJoinObserver.AddActivePlan(activePlan);
  499. seventhJoinObserver.AddActivePlan(activePlan);
  500. eighthJoinObserver.AddActivePlan(activePlan);
  501. ninthJoinObserver.AddActivePlan(activePlan);
  502. return activePlan;
  503. }
  504. }
  505. internal class Plan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> : Plan<TResult>
  506. {
  507. internal Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> Expression { get; private set; }
  508. internal Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> Selector { get; private set; }
  509. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> expression,
  510. Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> selector)
  511. {
  512. Expression = expression;
  513. Selector = selector;
  514. }
  515. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  516. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  517. {
  518. var onError = new Action<Exception>(observer.OnError);
  519. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  520. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  521. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  522. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  523. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  524. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  525. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  526. var eighthJoinObserver = CreateObserver(externalSubscriptions, Expression.Eighth, onError);
  527. var ninthJoinObserver = CreateObserver(externalSubscriptions, Expression.Ninth, onError);
  528. var tenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Tenth, onError);
  529. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>);
  530. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  531. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver, seventhJoinObserver, eighthJoinObserver, ninthJoinObserver, tenthJoinObserver,
  532. (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth) =>
  533. {
  534. var result = default(TResult);
  535. try
  536. {
  537. result = Selector(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth);
  538. }
  539. catch (Exception exception)
  540. {
  541. observer.OnError(exception);
  542. return;
  543. }
  544. observer.OnNext(result);
  545. },
  546. () =>
  547. {
  548. firstJoinObserver.RemoveActivePlan(activePlan);
  549. secondJoinObserver.RemoveActivePlan(activePlan);
  550. thirdJoinObserver.RemoveActivePlan(activePlan);
  551. fourthJoinObserver.RemoveActivePlan(activePlan);
  552. fifthJoinObserver.RemoveActivePlan(activePlan);
  553. sixthJoinObserver.RemoveActivePlan(activePlan);
  554. seventhJoinObserver.RemoveActivePlan(activePlan);
  555. eighthJoinObserver.RemoveActivePlan(activePlan);
  556. ninthJoinObserver.RemoveActivePlan(activePlan);
  557. tenthJoinObserver.RemoveActivePlan(activePlan);
  558. deactivate(activePlan);
  559. });
  560. firstJoinObserver.AddActivePlan(activePlan);
  561. secondJoinObserver.AddActivePlan(activePlan);
  562. thirdJoinObserver.AddActivePlan(activePlan);
  563. fourthJoinObserver.AddActivePlan(activePlan);
  564. fifthJoinObserver.AddActivePlan(activePlan);
  565. sixthJoinObserver.AddActivePlan(activePlan);
  566. seventhJoinObserver.AddActivePlan(activePlan);
  567. eighthJoinObserver.AddActivePlan(activePlan);
  568. ninthJoinObserver.AddActivePlan(activePlan);
  569. tenthJoinObserver.AddActivePlan(activePlan);
  570. return activePlan;
  571. }
  572. }
  573. internal class Plan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> : Plan<TResult>
  574. {
  575. internal Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> Expression { get; private set; }
  576. internal Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> Selector { get; private set; }
  577. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> expression,
  578. Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> selector)
  579. {
  580. Expression = expression;
  581. Selector = selector;
  582. }
  583. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  584. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  585. {
  586. var onError = new Action<Exception>(observer.OnError);
  587. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  588. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  589. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  590. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  591. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  592. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  593. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  594. var eighthJoinObserver = CreateObserver(externalSubscriptions, Expression.Eighth, onError);
  595. var ninthJoinObserver = CreateObserver(externalSubscriptions, Expression.Ninth, onError);
  596. var tenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Tenth, onError);
  597. var eleventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Eleventh, onError);
  598. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>);
  599. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  600. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver, seventhJoinObserver, eighthJoinObserver, ninthJoinObserver, tenthJoinObserver,eleventhJoinObserver,
  601. (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh) =>
  602. {
  603. var result = default(TResult);
  604. try
  605. {
  606. result = Selector(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh);
  607. }
  608. catch (Exception exception)
  609. {
  610. observer.OnError(exception);
  611. return;
  612. }
  613. observer.OnNext(result);
  614. },
  615. () =>
  616. {
  617. firstJoinObserver.RemoveActivePlan(activePlan);
  618. secondJoinObserver.RemoveActivePlan(activePlan);
  619. thirdJoinObserver.RemoveActivePlan(activePlan);
  620. fourthJoinObserver.RemoveActivePlan(activePlan);
  621. fifthJoinObserver.RemoveActivePlan(activePlan);
  622. sixthJoinObserver.RemoveActivePlan(activePlan);
  623. seventhJoinObserver.RemoveActivePlan(activePlan);
  624. eighthJoinObserver.RemoveActivePlan(activePlan);
  625. ninthJoinObserver.RemoveActivePlan(activePlan);
  626. tenthJoinObserver.RemoveActivePlan(activePlan);
  627. eleventhJoinObserver.RemoveActivePlan(activePlan);
  628. deactivate(activePlan);
  629. });
  630. firstJoinObserver.AddActivePlan(activePlan);
  631. secondJoinObserver.AddActivePlan(activePlan);
  632. thirdJoinObserver.AddActivePlan(activePlan);
  633. fourthJoinObserver.AddActivePlan(activePlan);
  634. fifthJoinObserver.AddActivePlan(activePlan);
  635. sixthJoinObserver.AddActivePlan(activePlan);
  636. seventhJoinObserver.AddActivePlan(activePlan);
  637. eighthJoinObserver.AddActivePlan(activePlan);
  638. ninthJoinObserver.AddActivePlan(activePlan);
  639. tenthJoinObserver.AddActivePlan(activePlan);
  640. eleventhJoinObserver.AddActivePlan(activePlan);
  641. return activePlan;
  642. }
  643. }
  644. internal class Plan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> : Plan<TResult>
  645. {
  646. internal Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> Expression { get; private set; }
  647. internal Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> Selector { get; private set; }
  648. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> expression,
  649. Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> selector)
  650. {
  651. Expression = expression;
  652. Selector = selector;
  653. }
  654. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  655. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  656. {
  657. var onError = new Action<Exception>(observer.OnError);
  658. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  659. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  660. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  661. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  662. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  663. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  664. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  665. var eighthJoinObserver = CreateObserver(externalSubscriptions, Expression.Eighth, onError);
  666. var ninthJoinObserver = CreateObserver(externalSubscriptions, Expression.Ninth, onError);
  667. var tenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Tenth, onError);
  668. var eleventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Eleventh, onError);
  669. var twelfthJoinObserver = CreateObserver(externalSubscriptions, Expression.Twelfth, onError);
  670. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>);
  671. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  672. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver, seventhJoinObserver, eighthJoinObserver, ninthJoinObserver, tenthJoinObserver, eleventhJoinObserver,
  673. twelfthJoinObserver,
  674. (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth) =>
  675. {
  676. var result = default(TResult);
  677. try
  678. {
  679. result = Selector(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth);
  680. }
  681. catch (Exception exception)
  682. {
  683. observer.OnError(exception);
  684. return;
  685. }
  686. observer.OnNext(result);
  687. },
  688. () =>
  689. {
  690. firstJoinObserver.RemoveActivePlan(activePlan);
  691. secondJoinObserver.RemoveActivePlan(activePlan);
  692. thirdJoinObserver.RemoveActivePlan(activePlan);
  693. fourthJoinObserver.RemoveActivePlan(activePlan);
  694. fifthJoinObserver.RemoveActivePlan(activePlan);
  695. sixthJoinObserver.RemoveActivePlan(activePlan);
  696. seventhJoinObserver.RemoveActivePlan(activePlan);
  697. eighthJoinObserver.RemoveActivePlan(activePlan);
  698. ninthJoinObserver.RemoveActivePlan(activePlan);
  699. tenthJoinObserver.RemoveActivePlan(activePlan);
  700. eleventhJoinObserver.RemoveActivePlan(activePlan);
  701. twelfthJoinObserver.RemoveActivePlan(activePlan);
  702. deactivate(activePlan);
  703. });
  704. firstJoinObserver.AddActivePlan(activePlan);
  705. secondJoinObserver.AddActivePlan(activePlan);
  706. thirdJoinObserver.AddActivePlan(activePlan);
  707. fourthJoinObserver.AddActivePlan(activePlan);
  708. fifthJoinObserver.AddActivePlan(activePlan);
  709. sixthJoinObserver.AddActivePlan(activePlan);
  710. seventhJoinObserver.AddActivePlan(activePlan);
  711. eighthJoinObserver.AddActivePlan(activePlan);
  712. ninthJoinObserver.AddActivePlan(activePlan);
  713. tenthJoinObserver.AddActivePlan(activePlan);
  714. eleventhJoinObserver.AddActivePlan(activePlan);
  715. twelfthJoinObserver.AddActivePlan(activePlan);
  716. return activePlan;
  717. }
  718. }
  719. internal class Plan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> : Plan<TResult>
  720. {
  721. internal Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> Expression { get; private set; }
  722. internal Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> Selector { get; private set; }
  723. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> expression,
  724. Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> selector)
  725. {
  726. Expression = expression;
  727. Selector = selector;
  728. }
  729. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  730. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  731. {
  732. var onError = new Action<Exception>(observer.OnError);
  733. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  734. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  735. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  736. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  737. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  738. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  739. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  740. var eighthJoinObserver = CreateObserver(externalSubscriptions, Expression.Eighth, onError);
  741. var ninthJoinObserver = CreateObserver(externalSubscriptions, Expression.Ninth, onError);
  742. var tenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Tenth, onError);
  743. var eleventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Eleventh, onError);
  744. var twelfthJoinObserver = CreateObserver(externalSubscriptions, Expression.Twelfth, onError);
  745. var thirteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Thirteenth, onError);
  746. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>);
  747. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  748. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver, seventhJoinObserver, eighthJoinObserver, ninthJoinObserver, tenthJoinObserver, eleventhJoinObserver,
  749. twelfthJoinObserver, thirteenthJoinObserver,
  750. (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth) =>
  751. {
  752. var result = default(TResult);
  753. try
  754. {
  755. result = Selector(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth);
  756. }
  757. catch (Exception exception)
  758. {
  759. observer.OnError(exception);
  760. return;
  761. }
  762. observer.OnNext(result);
  763. },
  764. () =>
  765. {
  766. firstJoinObserver.RemoveActivePlan(activePlan);
  767. secondJoinObserver.RemoveActivePlan(activePlan);
  768. thirdJoinObserver.RemoveActivePlan(activePlan);
  769. fourthJoinObserver.RemoveActivePlan(activePlan);
  770. fifthJoinObserver.RemoveActivePlan(activePlan);
  771. sixthJoinObserver.RemoveActivePlan(activePlan);
  772. seventhJoinObserver.RemoveActivePlan(activePlan);
  773. eighthJoinObserver.RemoveActivePlan(activePlan);
  774. ninthJoinObserver.RemoveActivePlan(activePlan);
  775. tenthJoinObserver.RemoveActivePlan(activePlan);
  776. eleventhJoinObserver.RemoveActivePlan(activePlan);
  777. twelfthJoinObserver.RemoveActivePlan(activePlan);
  778. thirteenthJoinObserver.RemoveActivePlan(activePlan);
  779. deactivate(activePlan);
  780. });
  781. firstJoinObserver.AddActivePlan(activePlan);
  782. secondJoinObserver.AddActivePlan(activePlan);
  783. thirdJoinObserver.AddActivePlan(activePlan);
  784. fourthJoinObserver.AddActivePlan(activePlan);
  785. fifthJoinObserver.AddActivePlan(activePlan);
  786. sixthJoinObserver.AddActivePlan(activePlan);
  787. seventhJoinObserver.AddActivePlan(activePlan);
  788. eighthJoinObserver.AddActivePlan(activePlan);
  789. ninthJoinObserver.AddActivePlan(activePlan);
  790. tenthJoinObserver.AddActivePlan(activePlan);
  791. eleventhJoinObserver.AddActivePlan(activePlan);
  792. twelfthJoinObserver.AddActivePlan(activePlan);
  793. thirteenthJoinObserver.AddActivePlan(activePlan);
  794. return activePlan;
  795. }
  796. }
  797. internal class Plan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> : Plan<TResult>
  798. {
  799. internal Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> Expression { get; private set; }
  800. internal Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> Selector { get; private set; }
  801. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> expression,
  802. Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> selector)
  803. {
  804. Expression = expression;
  805. Selector = selector;
  806. }
  807. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  808. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  809. {
  810. var onError = new Action<Exception>(observer.OnError);
  811. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  812. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  813. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  814. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  815. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  816. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  817. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  818. var eighthJoinObserver = CreateObserver(externalSubscriptions, Expression.Eighth, onError);
  819. var ninthJoinObserver = CreateObserver(externalSubscriptions, Expression.Ninth, onError);
  820. var tenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Tenth, onError);
  821. var eleventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Eleventh, onError);
  822. var twelfthJoinObserver = CreateObserver(externalSubscriptions, Expression.Twelfth, onError);
  823. var thirteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Thirteenth, onError);
  824. var fourteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourteenth, onError);
  825. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>);
  826. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(
  827. firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  828. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver,
  829. seventhJoinObserver, eighthJoinObserver, ninthJoinObserver,
  830. tenthJoinObserver, eleventhJoinObserver,
  831. twelfthJoinObserver, thirteenthJoinObserver,
  832. fourteenthJoinObserver,
  833. (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) =>
  834. {
  835. var result = default(TResult);
  836. try
  837. {
  838. result = Selector(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth);
  839. }
  840. catch (Exception exception)
  841. {
  842. observer.OnError(exception);
  843. return;
  844. }
  845. observer.OnNext(result);
  846. },
  847. () =>
  848. {
  849. firstJoinObserver.RemoveActivePlan(activePlan);
  850. secondJoinObserver.RemoveActivePlan(activePlan);
  851. thirdJoinObserver.RemoveActivePlan(activePlan);
  852. fourthJoinObserver.RemoveActivePlan(activePlan);
  853. fifthJoinObserver.RemoveActivePlan(activePlan);
  854. sixthJoinObserver.RemoveActivePlan(activePlan);
  855. seventhJoinObserver.RemoveActivePlan(activePlan);
  856. eighthJoinObserver.RemoveActivePlan(activePlan);
  857. ninthJoinObserver.RemoveActivePlan(activePlan);
  858. tenthJoinObserver.RemoveActivePlan(activePlan);
  859. eleventhJoinObserver.RemoveActivePlan(activePlan);
  860. twelfthJoinObserver.RemoveActivePlan(activePlan);
  861. thirteenthJoinObserver.RemoveActivePlan(activePlan);
  862. fourteenthJoinObserver.RemoveActivePlan(activePlan);
  863. deactivate(activePlan);
  864. });
  865. firstJoinObserver.AddActivePlan(activePlan);
  866. secondJoinObserver.AddActivePlan(activePlan);
  867. thirdJoinObserver.AddActivePlan(activePlan);
  868. fourthJoinObserver.AddActivePlan(activePlan);
  869. fifthJoinObserver.AddActivePlan(activePlan);
  870. sixthJoinObserver.AddActivePlan(activePlan);
  871. seventhJoinObserver.AddActivePlan(activePlan);
  872. eighthJoinObserver.AddActivePlan(activePlan);
  873. ninthJoinObserver.AddActivePlan(activePlan);
  874. tenthJoinObserver.AddActivePlan(activePlan);
  875. eleventhJoinObserver.AddActivePlan(activePlan);
  876. twelfthJoinObserver.AddActivePlan(activePlan);
  877. thirteenthJoinObserver.AddActivePlan(activePlan);
  878. fourteenthJoinObserver.AddActivePlan(activePlan);
  879. return activePlan;
  880. }
  881. }
  882. internal class Plan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> : Plan<TResult>
  883. {
  884. internal Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> Expression { get; private set; }
  885. internal Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> Selector { get; private set; }
  886. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> expression,
  887. Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> selector)
  888. {
  889. Expression = expression;
  890. Selector = selector;
  891. }
  892. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  893. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  894. {
  895. var onError = new Action<Exception>(observer.OnError);
  896. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  897. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  898. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  899. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  900. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  901. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  902. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  903. var eighthJoinObserver = CreateObserver(externalSubscriptions, Expression.Eighth, onError);
  904. var ninthJoinObserver = CreateObserver(externalSubscriptions, Expression.Ninth, onError);
  905. var tenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Tenth, onError);
  906. var eleventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Eleventh, onError);
  907. var twelfthJoinObserver = CreateObserver(externalSubscriptions, Expression.Twelfth, onError);
  908. var thirteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Thirteenth, onError);
  909. var fourteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourteenth, onError);
  910. var fifteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifteenth, onError);
  911. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>);
  912. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(
  913. firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  914. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver,
  915. seventhJoinObserver, eighthJoinObserver, ninthJoinObserver,
  916. tenthJoinObserver, eleventhJoinObserver,
  917. twelfthJoinObserver, thirteenthJoinObserver,
  918. fourteenthJoinObserver, fifteenthJoinObserver,
  919. (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) =>
  920. {
  921. var result = default(TResult);
  922. try
  923. {
  924. result = Selector(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth);
  925. }
  926. catch (Exception exception)
  927. {
  928. observer.OnError(exception);
  929. return;
  930. }
  931. observer.OnNext(result);
  932. },
  933. () =>
  934. {
  935. firstJoinObserver.RemoveActivePlan(activePlan);
  936. secondJoinObserver.RemoveActivePlan(activePlan);
  937. thirdJoinObserver.RemoveActivePlan(activePlan);
  938. fourthJoinObserver.RemoveActivePlan(activePlan);
  939. fifthJoinObserver.RemoveActivePlan(activePlan);
  940. sixthJoinObserver.RemoveActivePlan(activePlan);
  941. seventhJoinObserver.RemoveActivePlan(activePlan);
  942. eighthJoinObserver.RemoveActivePlan(activePlan);
  943. ninthJoinObserver.RemoveActivePlan(activePlan);
  944. tenthJoinObserver.RemoveActivePlan(activePlan);
  945. eleventhJoinObserver.RemoveActivePlan(activePlan);
  946. twelfthJoinObserver.RemoveActivePlan(activePlan);
  947. thirteenthJoinObserver.RemoveActivePlan(activePlan);
  948. fourteenthJoinObserver.RemoveActivePlan(activePlan);
  949. fifteenthJoinObserver.RemoveActivePlan(activePlan);
  950. deactivate(activePlan);
  951. });
  952. firstJoinObserver.AddActivePlan(activePlan);
  953. secondJoinObserver.AddActivePlan(activePlan);
  954. thirdJoinObserver.AddActivePlan(activePlan);
  955. fourthJoinObserver.AddActivePlan(activePlan);
  956. fifthJoinObserver.AddActivePlan(activePlan);
  957. sixthJoinObserver.AddActivePlan(activePlan);
  958. seventhJoinObserver.AddActivePlan(activePlan);
  959. eighthJoinObserver.AddActivePlan(activePlan);
  960. ninthJoinObserver.AddActivePlan(activePlan);
  961. tenthJoinObserver.AddActivePlan(activePlan);
  962. eleventhJoinObserver.AddActivePlan(activePlan);
  963. twelfthJoinObserver.AddActivePlan(activePlan);
  964. thirteenthJoinObserver.AddActivePlan(activePlan);
  965. fourteenthJoinObserver.AddActivePlan(activePlan);
  966. fifteenthJoinObserver.AddActivePlan(activePlan);
  967. return activePlan;
  968. }
  969. }
  970. internal class Plan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> : Plan<TResult>
  971. {
  972. internal Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> Expression { get; private set; }
  973. internal Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> Selector { get; private set; }
  974. internal Plan(Pattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> expression,
  975. Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> selector)
  976. {
  977. Expression = expression;
  978. Selector = selector;
  979. }
  980. internal override ActivePlan Activate(Dictionary<object, IJoinObserver> externalSubscriptions,
  981. IObserver<TResult> observer, Action<ActivePlan> deactivate)
  982. {
  983. var onError = new Action<Exception>(observer.OnError);
  984. var firstJoinObserver = CreateObserver(externalSubscriptions, Expression.First, onError);
  985. var secondJoinObserver = CreateObserver(externalSubscriptions, Expression.Second, onError);
  986. var thirdJoinObserver = CreateObserver(externalSubscriptions, Expression.Third, onError);
  987. var fourthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourth, onError);
  988. var fifthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifth, onError);
  989. var sixthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixth, onError);
  990. var seventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Seventh, onError);
  991. var eighthJoinObserver = CreateObserver(externalSubscriptions, Expression.Eighth, onError);
  992. var ninthJoinObserver = CreateObserver(externalSubscriptions, Expression.Ninth, onError);
  993. var tenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Tenth, onError);
  994. var eleventhJoinObserver = CreateObserver(externalSubscriptions, Expression.Eleventh, onError);
  995. var twelfthJoinObserver = CreateObserver(externalSubscriptions, Expression.Twelfth, onError);
  996. var thirteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Thirteenth, onError);
  997. var fourteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fourteenth, onError);
  998. var fifteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Fifteenth, onError);
  999. var sixteenthJoinObserver = CreateObserver(externalSubscriptions, Expression.Sixteenth, onError);
  1000. var activePlan = default(ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>);
  1001. activePlan = new ActivePlan<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(
  1002. firstJoinObserver, secondJoinObserver, thirdJoinObserver,
  1003. fourthJoinObserver, fifthJoinObserver, sixthJoinObserver,
  1004. seventhJoinObserver, eighthJoinObserver, ninthJoinObserver,
  1005. tenthJoinObserver, eleventhJoinObserver,
  1006. twelfthJoinObserver, thirteenthJoinObserver,
  1007. fourteenthJoinObserver, fifteenthJoinObserver,
  1008. sixteenthJoinObserver,
  1009. (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) =>
  1010. {
  1011. var result = default(TResult);
  1012. try
  1013. {
  1014. result = Selector(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth);
  1015. }
  1016. catch (Exception exception)
  1017. {
  1018. observer.OnError(exception);
  1019. return;
  1020. }
  1021. observer.OnNext(result);
  1022. },
  1023. () =>
  1024. {
  1025. firstJoinObserver.RemoveActivePlan(activePlan);
  1026. secondJoinObserver.RemoveActivePlan(activePlan);
  1027. thirdJoinObserver.RemoveActivePlan(activePlan);
  1028. fourthJoinObserver.RemoveActivePlan(activePlan);
  1029. fifthJoinObserver.RemoveActivePlan(activePlan);
  1030. sixthJoinObserver.RemoveActivePlan(activePlan);
  1031. seventhJoinObserver.RemoveActivePlan(activePlan);
  1032. eighthJoinObserver.RemoveActivePlan(activePlan);
  1033. ninthJoinObserver.RemoveActivePlan(activePlan);
  1034. tenthJoinObserver.RemoveActivePlan(activePlan);
  1035. eleventhJoinObserver.RemoveActivePlan(activePlan);
  1036. twelfthJoinObserver.RemoveActivePlan(activePlan);
  1037. thirteenthJoinObserver.RemoveActivePlan(activePlan);
  1038. fourteenthJoinObserver.RemoveActivePlan(activePlan);
  1039. fifteenthJoinObserver.RemoveActivePlan(activePlan);
  1040. sixteenthJoinObserver.RemoveActivePlan(activePlan);
  1041. deactivate(activePlan);
  1042. });
  1043. firstJoinObserver.AddActivePlan(activePlan);
  1044. secondJoinObserver.AddActivePlan(activePlan);
  1045. thirdJoinObserver.AddActivePlan(activePlan);
  1046. fourthJoinObserver.AddActivePlan(activePlan);
  1047. fifthJoinObserver.AddActivePlan(activePlan);
  1048. sixthJoinObserver.AddActivePlan(activePlan);
  1049. seventhJoinObserver.AddActivePlan(activePlan);
  1050. eighthJoinObserver.AddActivePlan(activePlan);
  1051. ninthJoinObserver.AddActivePlan(activePlan);
  1052. tenthJoinObserver.AddActivePlan(activePlan);
  1053. eleventhJoinObserver.AddActivePlan(activePlan);
  1054. twelfthJoinObserver.AddActivePlan(activePlan);
  1055. thirteenthJoinObserver.AddActivePlan(activePlan);
  1056. fourteenthJoinObserver.AddActivePlan(activePlan);
  1057. fifteenthJoinObserver.AddActivePlan(activePlan);
  1058. sixteenthJoinObserver.AddActivePlan(activePlan);
  1059. return activePlan;
  1060. }
  1061. }
  1062. #endif
  1063. }