QueryLanguage.Async.cs 75 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Concurrency;
  3. using System.Reactive.Disposables;
  4. using System.Reactive.Subjects;
  5. #if !NO_TPL
  6. using System.Reactive.Threading.Tasks;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. #endif
  10. namespace System.Reactive.Linq
  11. {
  12. internal partial class QueryLanguage
  13. {
  14. #region FromAsyncPattern
  15. #region Func
  16. public virtual Func<IObservable<TResult>> FromAsyncPattern<TResult>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  17. {
  18. return () =>
  19. {
  20. var subject = new AsyncSubject<TResult>();
  21. try
  22. {
  23. begin(iar =>
  24. {
  25. // Note: Even if the callback completes synchronously, outgoing On* calls
  26. // cannot throw in user code since there can't be any subscribers
  27. // to the AsyncSubject yet. Therefore, there is no need to protect
  28. // against exceptions that'd be caught below and sent (incorrectly)
  29. // into the Observable.Throw sequence being constructed.
  30. TResult result;
  31. try
  32. {
  33. result = end(iar);
  34. }
  35. catch (Exception exception)
  36. {
  37. subject.OnError(exception);
  38. return;
  39. }
  40. subject.OnNext(result);
  41. subject.OnCompleted();
  42. }, null);
  43. }
  44. catch (Exception exception)
  45. {
  46. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  47. }
  48. return subject.AsObservable();
  49. };
  50. }
  51. public virtual Func<T1, IObservable<TResult>> FromAsyncPattern<T1, TResult>(Func<T1, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  52. {
  53. return x =>
  54. {
  55. var subject = new AsyncSubject<TResult>();
  56. try
  57. {
  58. begin(x, iar =>
  59. {
  60. // See remark on FromAsyncPattern<TResult>.
  61. TResult result;
  62. try
  63. {
  64. result = end(iar);
  65. }
  66. catch (Exception exception)
  67. {
  68. subject.OnError(exception);
  69. return;
  70. }
  71. subject.OnNext(result);
  72. subject.OnCompleted();
  73. }, null);
  74. }
  75. catch (Exception exception)
  76. {
  77. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  78. }
  79. return subject.AsObservable();
  80. };
  81. }
  82. public virtual Func<T1, T2, IObservable<TResult>> FromAsyncPattern<T1, T2, TResult>(Func<T1, T2, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  83. {
  84. return (x, y) =>
  85. {
  86. var subject = new AsyncSubject<TResult>();
  87. try
  88. {
  89. begin(x, y, iar =>
  90. {
  91. // See remark on FromAsyncPattern<TResult>.
  92. TResult result;
  93. try
  94. {
  95. result = end(iar);
  96. }
  97. catch (Exception exception)
  98. {
  99. subject.OnError(exception);
  100. return;
  101. }
  102. subject.OnNext(result);
  103. subject.OnCompleted();
  104. }, null);
  105. }
  106. catch (Exception exception)
  107. {
  108. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  109. }
  110. return subject.AsObservable();
  111. };
  112. }
  113. #if !NO_LARGEARITY
  114. public virtual Func<T1, T2, T3, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, TResult>(Func<T1, T2, T3, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  115. {
  116. return (x, y, z) =>
  117. {
  118. var subject = new AsyncSubject<TResult>();
  119. try
  120. {
  121. begin(x, y, z, iar =>
  122. {
  123. // See remark on FromAsyncPattern<TResult>.
  124. TResult result;
  125. try
  126. {
  127. result = end(iar);
  128. }
  129. catch (Exception exception)
  130. {
  131. subject.OnError(exception);
  132. return;
  133. }
  134. subject.OnNext(result);
  135. subject.OnCompleted();
  136. }, null);
  137. }
  138. catch (Exception exception)
  139. {
  140. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  141. }
  142. return subject.AsObservable();
  143. };
  144. }
  145. public virtual Func<T1, T2, T3, T4, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  146. {
  147. return (x, y, z, a) =>
  148. {
  149. var subject = new AsyncSubject<TResult>();
  150. try
  151. {
  152. begin(x, y, z, a, iar =>
  153. {
  154. // See remark on FromAsyncPattern<TResult>.
  155. TResult result;
  156. try
  157. {
  158. result = end(iar);
  159. }
  160. catch (Exception exception)
  161. {
  162. subject.OnError(exception);
  163. return;
  164. }
  165. subject.OnNext(result);
  166. subject.OnCompleted();
  167. }, null);
  168. }
  169. catch (Exception exception)
  170. {
  171. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  172. }
  173. return subject.AsObservable();
  174. };
  175. }
  176. public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  177. {
  178. return (x, y, z, a, b) =>
  179. {
  180. var subject = new AsyncSubject<TResult>();
  181. try
  182. {
  183. begin(x, y, z, a, b, iar =>
  184. {
  185. // See remark on FromAsyncPattern<TResult>.
  186. TResult result;
  187. try
  188. {
  189. result = end(iar);
  190. }
  191. catch (Exception exception)
  192. {
  193. subject.OnError(exception);
  194. return;
  195. }
  196. subject.OnNext(result);
  197. subject.OnCompleted();
  198. }, null);
  199. }
  200. catch (Exception exception)
  201. {
  202. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  203. }
  204. return subject.AsObservable();
  205. };
  206. }
  207. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  208. {
  209. return (x, y, z, a, b, c) =>
  210. {
  211. var subject = new AsyncSubject<TResult>();
  212. try
  213. {
  214. begin(x, y, z, a, b, c, iar =>
  215. {
  216. // See remark on FromAsyncPattern<TResult>.
  217. TResult result;
  218. try
  219. {
  220. result = end(iar);
  221. }
  222. catch (Exception exception)
  223. {
  224. subject.OnError(exception);
  225. return;
  226. }
  227. subject.OnNext(result);
  228. subject.OnCompleted();
  229. }, null);
  230. }
  231. catch (Exception exception)
  232. {
  233. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  234. }
  235. return subject.AsObservable();
  236. };
  237. }
  238. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  239. {
  240. return (x, y, z, a, b, c, d) =>
  241. {
  242. var subject = new AsyncSubject<TResult>();
  243. try
  244. {
  245. begin(x, y, z, a, b, c, d, iar =>
  246. {
  247. // See remark on FromAsyncPattern<TResult>.
  248. TResult result;
  249. try
  250. {
  251. result = end(iar);
  252. }
  253. catch (Exception exception)
  254. {
  255. subject.OnError(exception);
  256. return;
  257. }
  258. subject.OnNext(result);
  259. subject.OnCompleted();
  260. }, null);
  261. }
  262. catch (Exception exception)
  263. {
  264. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  265. }
  266. return subject.AsObservable();
  267. };
  268. }
  269. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  270. {
  271. return (x, y, z, a, b, c, d, e) =>
  272. {
  273. var subject = new AsyncSubject<TResult>();
  274. try
  275. {
  276. begin(x, y, z, a, b, c, d, e, iar =>
  277. {
  278. // See remark on FromAsyncPattern<TResult>.
  279. TResult result;
  280. try
  281. {
  282. result = end(iar);
  283. }
  284. catch (Exception exception)
  285. {
  286. subject.OnError(exception);
  287. return;
  288. }
  289. subject.OnNext(result);
  290. subject.OnCompleted();
  291. }, null);
  292. }
  293. catch (Exception exception)
  294. {
  295. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  296. }
  297. return subject.AsObservable();
  298. };
  299. }
  300. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  301. {
  302. return (x, y, z, a, b, c, d, e, f) =>
  303. {
  304. var subject = new AsyncSubject<TResult>();
  305. try
  306. {
  307. begin(x, y, z, a, b, c, d, e, f, iar =>
  308. {
  309. // See remark on FromAsyncPattern<TResult>.
  310. TResult result;
  311. try
  312. {
  313. result = end(iar);
  314. }
  315. catch (Exception exception)
  316. {
  317. subject.OnError(exception);
  318. return;
  319. }
  320. subject.OnNext(result);
  321. subject.OnCompleted();
  322. }, null);
  323. }
  324. catch (Exception exception)
  325. {
  326. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  327. }
  328. return subject.AsObservable();
  329. };
  330. }
  331. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  332. {
  333. return (x, y, z, a, b, c, d, e, f, g) =>
  334. {
  335. var subject = new AsyncSubject<TResult>();
  336. try
  337. {
  338. begin(x, y, z, a, b, c, d, e, f, g, iar =>
  339. {
  340. // See remark on FromAsyncPattern<TResult>.
  341. TResult result;
  342. try
  343. {
  344. result = end(iar);
  345. }
  346. catch (Exception exception)
  347. {
  348. subject.OnError(exception);
  349. return;
  350. }
  351. subject.OnNext(result);
  352. subject.OnCompleted();
  353. }, null);
  354. }
  355. catch (Exception exception)
  356. {
  357. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  358. }
  359. return subject.AsObservable();
  360. };
  361. }
  362. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  363. {
  364. return (x, y, z, a, b, c, d, e, f, g, h) =>
  365. {
  366. var subject = new AsyncSubject<TResult>();
  367. try
  368. {
  369. begin(x, y, z, a, b, c, d, e, f, g, h, iar =>
  370. {
  371. // See remark on FromAsyncPattern<TResult>.
  372. TResult result;
  373. try
  374. {
  375. result = end(iar);
  376. }
  377. catch (Exception exception)
  378. {
  379. subject.OnError(exception);
  380. return;
  381. }
  382. subject.OnNext(result);
  383. subject.OnCompleted();
  384. }, null);
  385. }
  386. catch (Exception exception)
  387. {
  388. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  389. }
  390. return subject.AsObservable();
  391. };
  392. }
  393. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  394. {
  395. return (x, y, z, a, b, c, d, e, f, g, h, i) =>
  396. {
  397. var subject = new AsyncSubject<TResult>();
  398. try
  399. {
  400. begin(x, y, z, a, b, c, d, e, f, g, h, i, iar =>
  401. {
  402. // See remark on FromAsyncPattern<TResult>.
  403. TResult result;
  404. try
  405. {
  406. result = end(iar);
  407. }
  408. catch (Exception exception)
  409. {
  410. subject.OnError(exception);
  411. return;
  412. }
  413. subject.OnNext(result);
  414. subject.OnCompleted();
  415. }, null);
  416. }
  417. catch (Exception exception)
  418. {
  419. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  420. }
  421. return subject.AsObservable();
  422. };
  423. }
  424. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  425. {
  426. return (x, y, z, a, b, c, d, e, f, g, h, i, j) =>
  427. {
  428. var subject = new AsyncSubject<TResult>();
  429. try
  430. {
  431. begin(x, y, z, a, b, c, d, e, f, g, h, i, j, iar =>
  432. {
  433. // See remark on FromAsyncPattern<TResult>.
  434. TResult result;
  435. try
  436. {
  437. result = end(iar);
  438. }
  439. catch (Exception exception)
  440. {
  441. subject.OnError(exception);
  442. return;
  443. }
  444. subject.OnNext(result);
  445. subject.OnCompleted();
  446. }, null);
  447. }
  448. catch (Exception exception)
  449. {
  450. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  451. }
  452. return subject.AsObservable();
  453. };
  454. }
  455. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  456. {
  457. return (x, y, z, a, b, c, d, e, f, g, h, i, j, k) =>
  458. {
  459. var subject = new AsyncSubject<TResult>();
  460. try
  461. {
  462. begin(x, y, z, a, b, c, d, e, f, g, h, i, j, k, iar =>
  463. {
  464. // See remark on FromAsyncPattern<TResult>.
  465. TResult result;
  466. try
  467. {
  468. result = end(iar);
  469. }
  470. catch (Exception exception)
  471. {
  472. subject.OnError(exception);
  473. return;
  474. }
  475. subject.OnNext(result);
  476. subject.OnCompleted();
  477. }, null);
  478. }
  479. catch (Exception exception)
  480. {
  481. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  482. }
  483. return subject.AsObservable();
  484. };
  485. }
  486. #endif
  487. #endregion
  488. #region Action
  489. public virtual Func<IObservable<Unit>> FromAsyncPattern(Func<AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  490. {
  491. return FromAsyncPattern(begin, iar =>
  492. {
  493. end(iar);
  494. return Unit.Default;
  495. });
  496. }
  497. public virtual Func<T1, IObservable<Unit>> FromAsyncPattern<T1>(Func<T1, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  498. {
  499. return FromAsyncPattern(begin, iar =>
  500. {
  501. end(iar);
  502. return Unit.Default;
  503. });
  504. }
  505. public virtual Func<T1, T2, IObservable<Unit>> FromAsyncPattern<T1, T2>(Func<T1, T2, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  506. {
  507. return FromAsyncPattern(begin, iar =>
  508. {
  509. end(iar);
  510. return Unit.Default;
  511. });
  512. }
  513. #if !NO_LARGEARITY
  514. public virtual Func<T1, T2, T3, IObservable<Unit>> FromAsyncPattern<T1, T2, T3>(Func<T1, T2, T3, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  515. {
  516. return FromAsyncPattern(begin, iar =>
  517. {
  518. end(iar);
  519. return Unit.Default;
  520. });
  521. }
  522. public virtual Func<T1, T2, T3, T4, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4>(Func<T1, T2, T3, T4, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  523. {
  524. return FromAsyncPattern(begin, iar =>
  525. {
  526. end(iar);
  527. return Unit.Default;
  528. });
  529. }
  530. public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5>(Func<T1, T2, T3, T4, T5, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  531. {
  532. return FromAsyncPattern(begin, iar =>
  533. {
  534. end(iar);
  535. return Unit.Default;
  536. });
  537. }
  538. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6>(Func<T1, T2, T3, T4, T5, T6, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  539. {
  540. return FromAsyncPattern(begin, iar =>
  541. {
  542. end(iar);
  543. return Unit.Default;
  544. });
  545. }
  546. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7>(Func<T1, T2, T3, T4, T5, T6, T7, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  547. {
  548. return FromAsyncPattern(begin, iar =>
  549. {
  550. end(iar);
  551. return Unit.Default;
  552. });
  553. }
  554. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8>(Func<T1, T2, T3, T4, T5, T6, T7, T8, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  555. {
  556. return FromAsyncPattern(begin, iar =>
  557. {
  558. end(iar);
  559. return Unit.Default;
  560. });
  561. }
  562. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  563. {
  564. return FromAsyncPattern(begin, iar =>
  565. {
  566. end(iar);
  567. return Unit.Default;
  568. });
  569. }
  570. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  571. {
  572. return FromAsyncPattern(begin, iar =>
  573. {
  574. end(iar);
  575. return Unit.Default;
  576. });
  577. }
  578. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  579. {
  580. return FromAsyncPattern(begin, iar =>
  581. {
  582. end(iar);
  583. return Unit.Default;
  584. });
  585. }
  586. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  587. {
  588. return FromAsyncPattern(begin, iar =>
  589. {
  590. end(iar);
  591. return Unit.Default;
  592. });
  593. }
  594. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  595. {
  596. return FromAsyncPattern(begin, iar =>
  597. {
  598. end(iar);
  599. return Unit.Default;
  600. });
  601. }
  602. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  603. {
  604. return FromAsyncPattern(begin, iar =>
  605. {
  606. end(iar);
  607. return Unit.Default;
  608. });
  609. }
  610. #endif
  611. #endregion
  612. #endregion
  613. #region Start[Async]
  614. #region Func
  615. public virtual IObservable<TSource> Start<TSource>(Func<TSource> function)
  616. {
  617. return ToAsync(function)();
  618. }
  619. public virtual IObservable<TSource> Start<TSource>(Func<TSource> function, IScheduler scheduler)
  620. {
  621. return ToAsync(function, scheduler)();
  622. }
  623. #if !NO_TPL
  624. public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync)
  625. {
  626. return StartAsyncImpl(functionAsync, null);
  627. }
  628. public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync, IScheduler scheduler)
  629. {
  630. return StartAsyncImpl(functionAsync, scheduler);
  631. }
  632. private IObservable<TSource> StartAsyncImpl<TSource>(Func<Task<TSource>> functionAsync, IScheduler scheduler)
  633. {
  634. var task = default(Task<TSource>);
  635. try
  636. {
  637. task = functionAsync();
  638. }
  639. catch (Exception exception)
  640. {
  641. return Throw<TSource>(exception);
  642. }
  643. if (scheduler != null)
  644. {
  645. return task.ToObservable(scheduler);
  646. }
  647. else
  648. {
  649. return task.ToObservable();
  650. }
  651. }
  652. public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync)
  653. {
  654. return StartAsyncImpl(functionAsync, null);
  655. }
  656. public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler scheduler)
  657. {
  658. return StartAsyncImpl(functionAsync, scheduler);
  659. }
  660. private IObservable<TSource> StartAsyncImpl<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler scheduler)
  661. {
  662. var cancellable = new CancellationDisposable();
  663. var task = default(Task<TSource>);
  664. try
  665. {
  666. task = functionAsync(cancellable.Token);
  667. }
  668. catch (Exception exception)
  669. {
  670. return Throw<TSource>(exception);
  671. }
  672. var result = default(IObservable<TSource>);
  673. if (scheduler != null)
  674. {
  675. result = task.ToObservable(scheduler);
  676. }
  677. else
  678. {
  679. result = task.ToObservable();
  680. }
  681. return new AnonymousObservable<TSource>(observer =>
  682. {
  683. //
  684. // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
  685. //
  686. var subscription = result.Subscribe/*Unsafe*/(observer);
  687. return StableCompositeDisposable.Create(cancellable, subscription);
  688. });
  689. }
  690. #endif
  691. #endregion
  692. #region Action
  693. public virtual IObservable<Unit> Start(Action action)
  694. {
  695. return ToAsync(action, SchedulerDefaults.AsyncConversions)();
  696. }
  697. public virtual IObservable<Unit> Start(Action action, IScheduler scheduler)
  698. {
  699. return ToAsync(action, scheduler)();
  700. }
  701. #if !NO_TPL
  702. public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync)
  703. {
  704. return StartAsyncImpl(actionAsync, null);
  705. }
  706. public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync, IScheduler scheduler)
  707. {
  708. return StartAsyncImpl(actionAsync, scheduler);
  709. }
  710. private IObservable<Unit> StartAsyncImpl(Func<Task> actionAsync, IScheduler scheduler)
  711. {
  712. var task = default(Task);
  713. try
  714. {
  715. task = actionAsync();
  716. }
  717. catch (Exception exception)
  718. {
  719. return Throw<Unit>(exception);
  720. }
  721. if (scheduler != null)
  722. {
  723. return task.ToObservable(scheduler);
  724. }
  725. else
  726. {
  727. return task.ToObservable();
  728. }
  729. }
  730. public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync)
  731. {
  732. return StartAsyncImpl(actionAsync, null);
  733. }
  734. public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
  735. {
  736. return StartAsyncImpl(actionAsync, scheduler);
  737. }
  738. private IObservable<Unit> StartAsyncImpl(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
  739. {
  740. var cancellable = new CancellationDisposable();
  741. var task = default(Task);
  742. try
  743. {
  744. task = actionAsync(cancellable.Token);
  745. }
  746. catch (Exception exception)
  747. {
  748. return Throw<Unit>(exception);
  749. }
  750. var result = default(IObservable<Unit>);
  751. if (scheduler != null)
  752. {
  753. result = task.ToObservable(scheduler);
  754. }
  755. else
  756. {
  757. result = task.ToObservable();
  758. }
  759. return new AnonymousObservable<Unit>(observer =>
  760. {
  761. //
  762. // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
  763. //
  764. var subscription = result.Subscribe/*Unsafe*/(observer);
  765. return StableCompositeDisposable.Create(cancellable, subscription);
  766. });
  767. }
  768. #endif
  769. #endregion
  770. #endregion
  771. #region FromAsync
  772. #if !NO_TPL
  773. #region Func
  774. public virtual IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync)
  775. {
  776. return Defer(() => StartAsync(functionAsync));
  777. }
  778. public virtual IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync)
  779. {
  780. return Defer(() => StartAsync(functionAsync));
  781. }
  782. public virtual IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, IScheduler scheduler)
  783. {
  784. return Defer(() => StartAsync(functionAsync, scheduler));
  785. }
  786. public virtual IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, IScheduler scheduler)
  787. {
  788. return Defer(() => StartAsync(functionAsync, scheduler));
  789. }
  790. #endregion
  791. #region Action
  792. public virtual IObservable<Unit> FromAsync(Func<Task> actionAsync)
  793. {
  794. return Defer(() => StartAsync(actionAsync));
  795. }
  796. public virtual IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync)
  797. {
  798. return Defer(() => StartAsync(actionAsync));
  799. }
  800. public virtual IObservable<Unit> FromAsync(Func<Task> actionAsync, IScheduler scheduler)
  801. {
  802. return Defer(() => StartAsync(actionAsync, scheduler));
  803. }
  804. public virtual IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
  805. {
  806. return Defer(() => StartAsync(actionAsync, scheduler));
  807. }
  808. #endregion
  809. #endif
  810. #endregion
  811. #region ToAsync
  812. #region Func
  813. public virtual Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function)
  814. {
  815. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  816. }
  817. public virtual Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function, IScheduler scheduler)
  818. {
  819. return () =>
  820. {
  821. var subject = new AsyncSubject<TResult>();
  822. scheduler.Schedule(() =>
  823. {
  824. var result = default(TResult);
  825. try
  826. {
  827. result = function();
  828. }
  829. catch (Exception exception)
  830. {
  831. subject.OnError(exception);
  832. return;
  833. }
  834. subject.OnNext(result);
  835. subject.OnCompleted();
  836. });
  837. return subject.AsObservable();
  838. };
  839. }
  840. public virtual Func<T, IObservable<TResult>> ToAsync<T, TResult>(Func<T, TResult> function)
  841. {
  842. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  843. }
  844. public virtual Func<T, IObservable<TResult>> ToAsync<T, TResult>(Func<T, TResult> function, IScheduler scheduler)
  845. {
  846. return (first) =>
  847. {
  848. var subject = new AsyncSubject<TResult>();
  849. scheduler.Schedule(() =>
  850. {
  851. var result = default(TResult);
  852. try
  853. {
  854. result = function(first);
  855. }
  856. catch (Exception exception)
  857. {
  858. subject.OnError(exception);
  859. return;
  860. }
  861. subject.OnNext(result);
  862. subject.OnCompleted();
  863. });
  864. return subject.AsObservable();
  865. };
  866. }
  867. public virtual Func<T1, T2, IObservable<TResult>> ToAsync<T1, T2, TResult>(Func<T1, T2, TResult> function)
  868. {
  869. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  870. }
  871. public virtual Func<T1, T2, IObservable<TResult>> ToAsync<T1, T2, TResult>(Func<T1, T2, TResult> function, IScheduler scheduler)
  872. {
  873. return (first, second) =>
  874. {
  875. var subject = new AsyncSubject<TResult>();
  876. scheduler.Schedule(() =>
  877. {
  878. var result = default(TResult);
  879. try
  880. {
  881. result = function(first, second);
  882. }
  883. catch (Exception exception)
  884. {
  885. subject.OnError(exception);
  886. return;
  887. }
  888. subject.OnNext(result);
  889. subject.OnCompleted();
  890. });
  891. return subject.AsObservable();
  892. };
  893. }
  894. public virtual Func<T1, T2, T3, IObservable<TResult>> ToAsync<T1, T2, T3, TResult>(Func<T1, T2, T3, TResult> function)
  895. {
  896. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  897. }
  898. public virtual Func<T1, T2, T3, IObservable<TResult>> ToAsync<T1, T2, T3, TResult>(Func<T1, T2, T3, TResult> function, IScheduler scheduler)
  899. {
  900. return (first, second, third) =>
  901. {
  902. var subject = new AsyncSubject<TResult>();
  903. scheduler.Schedule(() =>
  904. {
  905. var result = default(TResult);
  906. try
  907. {
  908. result = function(first, second, third);
  909. }
  910. catch (Exception exception)
  911. {
  912. subject.OnError(exception);
  913. return;
  914. }
  915. subject.OnNext(result);
  916. subject.OnCompleted();
  917. });
  918. return subject.AsObservable();
  919. };
  920. }
  921. public virtual Func<T1, T2, T3, T4, IObservable<TResult>> ToAsync<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, TResult> function)
  922. {
  923. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  924. }
  925. public virtual Func<T1, T2, T3, T4, IObservable<TResult>> ToAsync<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, TResult> function, IScheduler scheduler)
  926. {
  927. return (first, second, third, fourth) =>
  928. {
  929. var subject = new AsyncSubject<TResult>();
  930. scheduler.Schedule(() =>
  931. {
  932. var result = default(TResult);
  933. try
  934. {
  935. result = function(first, second, third, fourth);
  936. }
  937. catch (Exception exception)
  938. {
  939. subject.OnError(exception);
  940. return;
  941. }
  942. subject.OnNext(result);
  943. subject.OnCompleted();
  944. });
  945. return subject.AsObservable();
  946. };
  947. }
  948. #if !NO_LARGEARITY
  949. public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, TResult> function)
  950. {
  951. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  952. }
  953. public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, TResult> function, IScheduler scheduler)
  954. {
  955. return (first, second, third, fourth, fifth) =>
  956. {
  957. var subject = new AsyncSubject<TResult>();
  958. scheduler.Schedule(() =>
  959. {
  960. var result = default(TResult);
  961. try
  962. {
  963. result = function(first, second, third, fourth, fifth);
  964. }
  965. catch (Exception exception)
  966. {
  967. subject.OnError(exception);
  968. return;
  969. }
  970. subject.OnNext(result);
  971. subject.OnCompleted();
  972. });
  973. return subject.AsObservable();
  974. };
  975. }
  976. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, TResult> function)
  977. {
  978. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  979. }
  980. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, TResult> function, IScheduler scheduler)
  981. {
  982. return (first, second, third, fourth, fifth, sixth) =>
  983. {
  984. var subject = new AsyncSubject<TResult>();
  985. scheduler.Schedule(() =>
  986. {
  987. var result = default(TResult);
  988. try
  989. {
  990. result = function(first, second, third, fourth, fifth, sixth);
  991. }
  992. catch (Exception exception)
  993. {
  994. subject.OnError(exception);
  995. return;
  996. }
  997. subject.OnNext(result);
  998. subject.OnCompleted();
  999. });
  1000. return subject.AsObservable();
  1001. };
  1002. }
  1003. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, TResult> function)
  1004. {
  1005. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1006. }
  1007. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, TResult> function, IScheduler scheduler)
  1008. {
  1009. return (first, second, third, fourth, fifth, sixth, seventh) =>
  1010. {
  1011. var subject = new AsyncSubject<TResult>();
  1012. scheduler.Schedule(() =>
  1013. {
  1014. var result = default(TResult);
  1015. try
  1016. {
  1017. result = function(first, second, third, fourth, fifth, sixth, seventh);
  1018. }
  1019. catch (Exception exception)
  1020. {
  1021. subject.OnError(exception);
  1022. return;
  1023. }
  1024. subject.OnNext(result);
  1025. subject.OnCompleted();
  1026. });
  1027. return subject.AsObservable();
  1028. };
  1029. }
  1030. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function)
  1031. {
  1032. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1033. }
  1034. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function, IScheduler scheduler)
  1035. {
  1036. return (first, second, third, fourth, fifth, sixth, seventh, eight) =>
  1037. {
  1038. var subject = new AsyncSubject<TResult>();
  1039. scheduler.Schedule(() =>
  1040. {
  1041. var result = default(TResult);
  1042. try
  1043. {
  1044. result = function(first, second, third, fourth, fifth, sixth, seventh, eight);
  1045. }
  1046. catch (Exception exception)
  1047. {
  1048. subject.OnError(exception);
  1049. return;
  1050. }
  1051. subject.OnNext(result);
  1052. subject.OnCompleted();
  1053. });
  1054. return subject.AsObservable();
  1055. };
  1056. }
  1057. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function)
  1058. {
  1059. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1060. }
  1061. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function, IScheduler scheduler)
  1062. {
  1063. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth) =>
  1064. {
  1065. var subject = new AsyncSubject<TResult>();
  1066. scheduler.Schedule(() =>
  1067. {
  1068. var result = default(TResult);
  1069. try
  1070. {
  1071. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth);
  1072. }
  1073. catch (Exception exception)
  1074. {
  1075. subject.OnError(exception);
  1076. return;
  1077. }
  1078. subject.OnNext(result);
  1079. subject.OnCompleted();
  1080. });
  1081. return subject.AsObservable();
  1082. };
  1083. }
  1084. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function)
  1085. {
  1086. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1087. }
  1088. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function, IScheduler scheduler)
  1089. {
  1090. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth) =>
  1091. {
  1092. var subject = new AsyncSubject<TResult>();
  1093. scheduler.Schedule(() =>
  1094. {
  1095. var result = default(TResult);
  1096. try
  1097. {
  1098. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth);
  1099. }
  1100. catch (Exception exception)
  1101. {
  1102. subject.OnError(exception);
  1103. return;
  1104. }
  1105. subject.OnNext(result);
  1106. subject.OnCompleted();
  1107. });
  1108. return subject.AsObservable();
  1109. };
  1110. }
  1111. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function)
  1112. {
  1113. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1114. }
  1115. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function, IScheduler scheduler)
  1116. {
  1117. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh) =>
  1118. {
  1119. var subject = new AsyncSubject<TResult>();
  1120. scheduler.Schedule(() =>
  1121. {
  1122. var result = default(TResult);
  1123. try
  1124. {
  1125. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh);
  1126. }
  1127. catch (Exception exception)
  1128. {
  1129. subject.OnError(exception);
  1130. return;
  1131. }
  1132. subject.OnNext(result);
  1133. subject.OnCompleted();
  1134. });
  1135. return subject.AsObservable();
  1136. };
  1137. }
  1138. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function)
  1139. {
  1140. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1141. }
  1142. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function, IScheduler scheduler)
  1143. {
  1144. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth) =>
  1145. {
  1146. var subject = new AsyncSubject<TResult>();
  1147. scheduler.Schedule(() =>
  1148. {
  1149. var result = default(TResult);
  1150. try
  1151. {
  1152. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth);
  1153. }
  1154. catch (Exception exception)
  1155. {
  1156. subject.OnError(exception);
  1157. return;
  1158. }
  1159. subject.OnNext(result);
  1160. subject.OnCompleted();
  1161. });
  1162. return subject.AsObservable();
  1163. };
  1164. }
  1165. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function)
  1166. {
  1167. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1168. }
  1169. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function, IScheduler scheduler)
  1170. {
  1171. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth) =>
  1172. {
  1173. var subject = new AsyncSubject<TResult>();
  1174. scheduler.Schedule(() =>
  1175. {
  1176. var result = default(TResult);
  1177. try
  1178. {
  1179. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth);
  1180. }
  1181. catch (Exception exception)
  1182. {
  1183. subject.OnError(exception);
  1184. return;
  1185. }
  1186. subject.OnNext(result);
  1187. subject.OnCompleted();
  1188. });
  1189. return subject.AsObservable();
  1190. };
  1191. }
  1192. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function)
  1193. {
  1194. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1195. }
  1196. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function, IScheduler scheduler)
  1197. {
  1198. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) =>
  1199. {
  1200. var subject = new AsyncSubject<TResult>();
  1201. scheduler.Schedule(() =>
  1202. {
  1203. var result = default(TResult);
  1204. try
  1205. {
  1206. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth);
  1207. }
  1208. catch (Exception exception)
  1209. {
  1210. subject.OnError(exception);
  1211. return;
  1212. }
  1213. subject.OnNext(result);
  1214. subject.OnCompleted();
  1215. });
  1216. return subject.AsObservable();
  1217. };
  1218. }
  1219. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function)
  1220. {
  1221. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1222. }
  1223. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function, IScheduler scheduler)
  1224. {
  1225. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) =>
  1226. {
  1227. var subject = new AsyncSubject<TResult>();
  1228. scheduler.Schedule(() =>
  1229. {
  1230. var result = default(TResult);
  1231. try
  1232. {
  1233. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth);
  1234. }
  1235. catch (Exception exception)
  1236. {
  1237. subject.OnError(exception);
  1238. return;
  1239. }
  1240. subject.OnNext(result);
  1241. subject.OnCompleted();
  1242. });
  1243. return subject.AsObservable();
  1244. };
  1245. }
  1246. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function)
  1247. {
  1248. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1249. }
  1250. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function, IScheduler scheduler)
  1251. {
  1252. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) =>
  1253. {
  1254. var subject = new AsyncSubject<TResult>();
  1255. scheduler.Schedule(() =>
  1256. {
  1257. var result = default(TResult);
  1258. try
  1259. {
  1260. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth);
  1261. }
  1262. catch (Exception exception)
  1263. {
  1264. subject.OnError(exception);
  1265. return;
  1266. }
  1267. subject.OnNext(result);
  1268. subject.OnCompleted();
  1269. });
  1270. return subject.AsObservable();
  1271. };
  1272. }
  1273. #endif
  1274. #endregion
  1275. #region Action
  1276. public virtual Func<IObservable<Unit>> ToAsync(Action action)
  1277. {
  1278. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1279. }
  1280. public virtual Func<IObservable<Unit>> ToAsync(Action action, IScheduler scheduler)
  1281. {
  1282. return () =>
  1283. {
  1284. var subject = new AsyncSubject<Unit>();
  1285. scheduler.Schedule(() =>
  1286. {
  1287. try
  1288. {
  1289. action();
  1290. }
  1291. catch (Exception exception)
  1292. {
  1293. subject.OnError(exception);
  1294. return;
  1295. }
  1296. subject.OnNext(Unit.Default);
  1297. subject.OnCompleted();
  1298. });
  1299. return subject.AsObservable();
  1300. };
  1301. }
  1302. public virtual Func<TSource, IObservable<Unit>> ToAsync<TSource>(Action<TSource> action)
  1303. {
  1304. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1305. }
  1306. public virtual Func<TSource, IObservable<Unit>> ToAsync<TSource>(Action<TSource> action, IScheduler scheduler)
  1307. {
  1308. return (first) =>
  1309. {
  1310. var subject = new AsyncSubject<Unit>();
  1311. scheduler.Schedule(() =>
  1312. {
  1313. try
  1314. {
  1315. action(first);
  1316. }
  1317. catch (Exception exception)
  1318. {
  1319. subject.OnError(exception);
  1320. return;
  1321. }
  1322. subject.OnNext(Unit.Default);
  1323. subject.OnCompleted();
  1324. });
  1325. return subject.AsObservable();
  1326. };
  1327. }
  1328. public virtual Func<T1, T2, IObservable<Unit>> ToAsync<T1, T2>(Action<T1, T2> action)
  1329. {
  1330. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1331. }
  1332. public virtual Func<T1, T2, IObservable<Unit>> ToAsync<T1, T2>(Action<T1, T2> action, IScheduler scheduler)
  1333. {
  1334. return (first, second) =>
  1335. {
  1336. var subject = new AsyncSubject<Unit>();
  1337. scheduler.Schedule(() =>
  1338. {
  1339. try
  1340. {
  1341. action(first, second);
  1342. }
  1343. catch (Exception exception)
  1344. {
  1345. subject.OnError(exception);
  1346. return;
  1347. }
  1348. subject.OnNext(Unit.Default);
  1349. subject.OnCompleted();
  1350. });
  1351. return subject.AsObservable();
  1352. };
  1353. }
  1354. public virtual Func<T1, T2, T3, IObservable<Unit>> ToAsync<T1, T2, T3>(Action<T1, T2, T3> action)
  1355. {
  1356. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1357. }
  1358. public virtual Func<T1, T2, T3, IObservable<Unit>> ToAsync<T1, T2, T3>(Action<T1, T2, T3> action, IScheduler scheduler)
  1359. {
  1360. return (first, second, third) =>
  1361. {
  1362. var subject = new AsyncSubject<Unit>();
  1363. scheduler.Schedule(() =>
  1364. {
  1365. try
  1366. {
  1367. action(first, second, third);
  1368. }
  1369. catch (Exception exception)
  1370. {
  1371. subject.OnError(exception);
  1372. return;
  1373. }
  1374. subject.OnNext(Unit.Default);
  1375. subject.OnCompleted();
  1376. });
  1377. return subject.AsObservable();
  1378. };
  1379. }
  1380. public virtual Func<T1, T2, T3, T4, IObservable<Unit>> ToAsync<T1, T2, T3, T4>(Action<T1, T2, T3, T4> action)
  1381. {
  1382. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1383. }
  1384. public virtual Func<T1, T2, T3, T4, IObservable<Unit>> ToAsync<T1, T2, T3, T4>(Action<T1, T2, T3, T4> action, IScheduler scheduler)
  1385. {
  1386. return (first, second, third, fourth) =>
  1387. {
  1388. var subject = new AsyncSubject<Unit>();
  1389. scheduler.Schedule(() =>
  1390. {
  1391. try
  1392. {
  1393. action(first, second, third, fourth);
  1394. }
  1395. catch (Exception exception)
  1396. {
  1397. subject.OnError(exception);
  1398. return;
  1399. }
  1400. subject.OnNext(Unit.Default);
  1401. subject.OnCompleted();
  1402. });
  1403. return subject.AsObservable();
  1404. };
  1405. }
  1406. #if !NO_LARGEARITY
  1407. public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5>(Action<T1, T2, T3, T4, T5> action)
  1408. {
  1409. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1410. }
  1411. public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5>(Action<T1, T2, T3, T4, T5> action, IScheduler scheduler)
  1412. {
  1413. return (first, second, third, fourth, fifth) =>
  1414. {
  1415. var subject = new AsyncSubject<Unit>();
  1416. scheduler.Schedule(() =>
  1417. {
  1418. try
  1419. {
  1420. action(first, second, third, fourth, fifth);
  1421. }
  1422. catch (Exception exception)
  1423. {
  1424. subject.OnError(exception);
  1425. return;
  1426. }
  1427. subject.OnNext(Unit.Default);
  1428. subject.OnCompleted();
  1429. });
  1430. return subject.AsObservable();
  1431. };
  1432. }
  1433. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6>(Action<T1, T2, T3, T4, T5, T6> action)
  1434. {
  1435. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1436. }
  1437. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6>(Action<T1, T2, T3, T4, T5, T6> action, IScheduler scheduler)
  1438. {
  1439. return (first, second, third, fourth, fifth, sixth) =>
  1440. {
  1441. var subject = new AsyncSubject<Unit>();
  1442. scheduler.Schedule(() =>
  1443. {
  1444. try
  1445. {
  1446. action(first, second, third, fourth, fifth, sixth);
  1447. }
  1448. catch (Exception exception)
  1449. {
  1450. subject.OnError(exception);
  1451. return;
  1452. }
  1453. subject.OnNext(Unit.Default);
  1454. subject.OnCompleted();
  1455. });
  1456. return subject.AsObservable();
  1457. };
  1458. }
  1459. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7>(Action<T1, T2, T3, T4, T5, T6, T7> action)
  1460. {
  1461. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1462. }
  1463. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7>(Action<T1, T2, T3, T4, T5, T6, T7> action, IScheduler scheduler)
  1464. {
  1465. return (first, second, third, fourth, fifth, sixth, seventh) =>
  1466. {
  1467. var subject = new AsyncSubject<Unit>();
  1468. scheduler.Schedule(() =>
  1469. {
  1470. try
  1471. {
  1472. action(first, second, third, fourth, fifth, sixth, seventh);
  1473. }
  1474. catch (Exception exception)
  1475. {
  1476. subject.OnError(exception);
  1477. return;
  1478. }
  1479. subject.OnNext(Unit.Default);
  1480. subject.OnCompleted();
  1481. });
  1482. return subject.AsObservable();
  1483. };
  1484. }
  1485. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(Action<T1, T2, T3, T4, T5, T6, T7, T8> action)
  1486. {
  1487. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1488. }
  1489. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(Action<T1, T2, T3, T4, T5, T6, T7, T8> action, IScheduler scheduler)
  1490. {
  1491. return (first, second, third, fourth, fifth, sixth, seventh, eight) =>
  1492. {
  1493. var subject = new AsyncSubject<Unit>();
  1494. scheduler.Schedule(() =>
  1495. {
  1496. try
  1497. {
  1498. action(first, second, third, fourth, fifth, sixth, seventh, eight);
  1499. }
  1500. catch (Exception exception)
  1501. {
  1502. subject.OnError(exception);
  1503. return;
  1504. }
  1505. subject.OnNext(Unit.Default);
  1506. subject.OnCompleted();
  1507. });
  1508. return subject.AsObservable();
  1509. };
  1510. }
  1511. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action)
  1512. {
  1513. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1514. }
  1515. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action, IScheduler scheduler)
  1516. {
  1517. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth) =>
  1518. {
  1519. var subject = new AsyncSubject<Unit>();
  1520. scheduler.Schedule(() =>
  1521. {
  1522. try
  1523. {
  1524. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth);
  1525. }
  1526. catch (Exception exception)
  1527. {
  1528. subject.OnError(exception);
  1529. return;
  1530. }
  1531. subject.OnNext(Unit.Default);
  1532. subject.OnCompleted();
  1533. });
  1534. return subject.AsObservable();
  1535. };
  1536. }
  1537. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action)
  1538. {
  1539. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1540. }
  1541. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action, IScheduler scheduler)
  1542. {
  1543. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth) =>
  1544. {
  1545. var subject = new AsyncSubject<Unit>();
  1546. scheduler.Schedule(() =>
  1547. {
  1548. try
  1549. {
  1550. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth);
  1551. }
  1552. catch (Exception exception)
  1553. {
  1554. subject.OnError(exception);
  1555. return;
  1556. }
  1557. subject.OnNext(Unit.Default);
  1558. subject.OnCompleted();
  1559. });
  1560. return subject.AsObservable();
  1561. };
  1562. }
  1563. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action)
  1564. {
  1565. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1566. }
  1567. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action, IScheduler scheduler)
  1568. {
  1569. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh) =>
  1570. {
  1571. var subject = new AsyncSubject<Unit>();
  1572. scheduler.Schedule(() =>
  1573. {
  1574. try
  1575. {
  1576. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh);
  1577. }
  1578. catch (Exception exception)
  1579. {
  1580. subject.OnError(exception);
  1581. return;
  1582. }
  1583. subject.OnNext(Unit.Default);
  1584. subject.OnCompleted();
  1585. });
  1586. return subject.AsObservable();
  1587. };
  1588. }
  1589. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action)
  1590. {
  1591. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1592. }
  1593. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action, IScheduler scheduler)
  1594. {
  1595. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth) =>
  1596. {
  1597. var subject = new AsyncSubject<Unit>();
  1598. scheduler.Schedule(() =>
  1599. {
  1600. try
  1601. {
  1602. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth);
  1603. }
  1604. catch (Exception exception)
  1605. {
  1606. subject.OnError(exception);
  1607. return;
  1608. }
  1609. subject.OnNext(Unit.Default);
  1610. subject.OnCompleted();
  1611. });
  1612. return subject.AsObservable();
  1613. };
  1614. }
  1615. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action)
  1616. {
  1617. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1618. }
  1619. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action, IScheduler scheduler)
  1620. {
  1621. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth) =>
  1622. {
  1623. var subject = new AsyncSubject<Unit>();
  1624. scheduler.Schedule(() =>
  1625. {
  1626. try
  1627. {
  1628. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth);
  1629. }
  1630. catch (Exception exception)
  1631. {
  1632. subject.OnError(exception);
  1633. return;
  1634. }
  1635. subject.OnNext(Unit.Default);
  1636. subject.OnCompleted();
  1637. });
  1638. return subject.AsObservable();
  1639. };
  1640. }
  1641. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action)
  1642. {
  1643. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1644. }
  1645. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action, IScheduler scheduler)
  1646. {
  1647. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) =>
  1648. {
  1649. var subject = new AsyncSubject<Unit>();
  1650. scheduler.Schedule(() =>
  1651. {
  1652. try
  1653. {
  1654. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth);
  1655. }
  1656. catch (Exception exception)
  1657. {
  1658. subject.OnError(exception);
  1659. return;
  1660. }
  1661. subject.OnNext(Unit.Default);
  1662. subject.OnCompleted();
  1663. });
  1664. return subject.AsObservable();
  1665. };
  1666. }
  1667. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action)
  1668. {
  1669. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1670. }
  1671. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action, IScheduler scheduler)
  1672. {
  1673. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) =>
  1674. {
  1675. var subject = new AsyncSubject<Unit>();
  1676. scheduler.Schedule(() =>
  1677. {
  1678. try
  1679. {
  1680. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth);
  1681. }
  1682. catch (Exception exception)
  1683. {
  1684. subject.OnError(exception);
  1685. return;
  1686. }
  1687. subject.OnNext(Unit.Default);
  1688. subject.OnCompleted();
  1689. });
  1690. return subject.AsObservable();
  1691. };
  1692. }
  1693. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action)
  1694. {
  1695. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1696. }
  1697. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action, IScheduler scheduler)
  1698. {
  1699. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) =>
  1700. {
  1701. var subject = new AsyncSubject<Unit>();
  1702. scheduler.Schedule(() =>
  1703. {
  1704. try
  1705. {
  1706. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth);
  1707. }
  1708. catch (Exception exception)
  1709. {
  1710. subject.OnError(exception);
  1711. return;
  1712. }
  1713. subject.OnNext(Unit.Default);
  1714. subject.OnCompleted();
  1715. });
  1716. return subject.AsObservable();
  1717. };
  1718. }
  1719. #endif
  1720. #endregion
  1721. #endregion
  1722. }
  1723. }