1
0

QueryLanguage.Async.cs 75 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. #if !NO_TPL
  8. using System.Reactive.Threading.Tasks;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. #endif
  12. namespace System.Reactive.Linq
  13. {
  14. internal partial class QueryLanguage
  15. {
  16. #region FromAsyncPattern
  17. #region Func
  18. public virtual Func<IObservable<TResult>> FromAsyncPattern<TResult>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  19. {
  20. return () =>
  21. {
  22. var subject = new AsyncSubject<TResult>();
  23. try
  24. {
  25. begin(iar =>
  26. {
  27. // Note: Even if the callback completes synchronously, outgoing On* calls
  28. // cannot throw in user code since there can't be any subscribers
  29. // to the AsyncSubject yet. Therefore, there is no need to protect
  30. // against exceptions that'd be caught below and sent (incorrectly)
  31. // into the Observable.Throw sequence being constructed.
  32. TResult result;
  33. try
  34. {
  35. result = end(iar);
  36. }
  37. catch (Exception exception)
  38. {
  39. subject.OnError(exception);
  40. return;
  41. }
  42. subject.OnNext(result);
  43. subject.OnCompleted();
  44. }, null);
  45. }
  46. catch (Exception exception)
  47. {
  48. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  49. }
  50. return subject.AsObservable();
  51. };
  52. }
  53. public virtual Func<T1, IObservable<TResult>> FromAsyncPattern<T1, TResult>(Func<T1, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  54. {
  55. return x =>
  56. {
  57. var subject = new AsyncSubject<TResult>();
  58. try
  59. {
  60. begin(x, iar =>
  61. {
  62. // See remark on FromAsyncPattern<TResult>.
  63. TResult result;
  64. try
  65. {
  66. result = end(iar);
  67. }
  68. catch (Exception exception)
  69. {
  70. subject.OnError(exception);
  71. return;
  72. }
  73. subject.OnNext(result);
  74. subject.OnCompleted();
  75. }, null);
  76. }
  77. catch (Exception exception)
  78. {
  79. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  80. }
  81. return subject.AsObservable();
  82. };
  83. }
  84. public virtual Func<T1, T2, IObservable<TResult>> FromAsyncPattern<T1, T2, TResult>(Func<T1, T2, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  85. {
  86. return (x, y) =>
  87. {
  88. var subject = new AsyncSubject<TResult>();
  89. try
  90. {
  91. begin(x, y, iar =>
  92. {
  93. // See remark on FromAsyncPattern<TResult>.
  94. TResult result;
  95. try
  96. {
  97. result = end(iar);
  98. }
  99. catch (Exception exception)
  100. {
  101. subject.OnError(exception);
  102. return;
  103. }
  104. subject.OnNext(result);
  105. subject.OnCompleted();
  106. }, null);
  107. }
  108. catch (Exception exception)
  109. {
  110. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  111. }
  112. return subject.AsObservable();
  113. };
  114. }
  115. #if !NO_LARGEARITY
  116. public virtual Func<T1, T2, T3, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, TResult>(Func<T1, T2, T3, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  117. {
  118. return (x, y, z) =>
  119. {
  120. var subject = new AsyncSubject<TResult>();
  121. try
  122. {
  123. begin(x, y, z, iar =>
  124. {
  125. // See remark on FromAsyncPattern<TResult>.
  126. TResult result;
  127. try
  128. {
  129. result = end(iar);
  130. }
  131. catch (Exception exception)
  132. {
  133. subject.OnError(exception);
  134. return;
  135. }
  136. subject.OnNext(result);
  137. subject.OnCompleted();
  138. }, null);
  139. }
  140. catch (Exception exception)
  141. {
  142. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  143. }
  144. return subject.AsObservable();
  145. };
  146. }
  147. public virtual Func<T1, T2, T3, T4, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  148. {
  149. return (x, y, z, a) =>
  150. {
  151. var subject = new AsyncSubject<TResult>();
  152. try
  153. {
  154. begin(x, y, z, a, iar =>
  155. {
  156. // See remark on FromAsyncPattern<TResult>.
  157. TResult result;
  158. try
  159. {
  160. result = end(iar);
  161. }
  162. catch (Exception exception)
  163. {
  164. subject.OnError(exception);
  165. return;
  166. }
  167. subject.OnNext(result);
  168. subject.OnCompleted();
  169. }, null);
  170. }
  171. catch (Exception exception)
  172. {
  173. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  174. }
  175. return subject.AsObservable();
  176. };
  177. }
  178. public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  179. {
  180. return (x, y, z, a, b) =>
  181. {
  182. var subject = new AsyncSubject<TResult>();
  183. try
  184. {
  185. begin(x, y, z, a, b, iar =>
  186. {
  187. // See remark on FromAsyncPattern<TResult>.
  188. TResult result;
  189. try
  190. {
  191. result = end(iar);
  192. }
  193. catch (Exception exception)
  194. {
  195. subject.OnError(exception);
  196. return;
  197. }
  198. subject.OnNext(result);
  199. subject.OnCompleted();
  200. }, null);
  201. }
  202. catch (Exception exception)
  203. {
  204. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  205. }
  206. return subject.AsObservable();
  207. };
  208. }
  209. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  210. {
  211. return (x, y, z, a, b, c) =>
  212. {
  213. var subject = new AsyncSubject<TResult>();
  214. try
  215. {
  216. begin(x, y, z, a, b, c, iar =>
  217. {
  218. // See remark on FromAsyncPattern<TResult>.
  219. TResult result;
  220. try
  221. {
  222. result = end(iar);
  223. }
  224. catch (Exception exception)
  225. {
  226. subject.OnError(exception);
  227. return;
  228. }
  229. subject.OnNext(result);
  230. subject.OnCompleted();
  231. }, null);
  232. }
  233. catch (Exception exception)
  234. {
  235. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  236. }
  237. return subject.AsObservable();
  238. };
  239. }
  240. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  241. {
  242. return (x, y, z, a, b, c, d) =>
  243. {
  244. var subject = new AsyncSubject<TResult>();
  245. try
  246. {
  247. begin(x, y, z, a, b, c, d, iar =>
  248. {
  249. // See remark on FromAsyncPattern<TResult>.
  250. TResult result;
  251. try
  252. {
  253. result = end(iar);
  254. }
  255. catch (Exception exception)
  256. {
  257. subject.OnError(exception);
  258. return;
  259. }
  260. subject.OnNext(result);
  261. subject.OnCompleted();
  262. }, null);
  263. }
  264. catch (Exception exception)
  265. {
  266. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  267. }
  268. return subject.AsObservable();
  269. };
  270. }
  271. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  272. {
  273. return (x, y, z, a, b, c, d, e) =>
  274. {
  275. var subject = new AsyncSubject<TResult>();
  276. try
  277. {
  278. begin(x, y, z, a, b, c, d, e, iar =>
  279. {
  280. // See remark on FromAsyncPattern<TResult>.
  281. TResult result;
  282. try
  283. {
  284. result = end(iar);
  285. }
  286. catch (Exception exception)
  287. {
  288. subject.OnError(exception);
  289. return;
  290. }
  291. subject.OnNext(result);
  292. subject.OnCompleted();
  293. }, null);
  294. }
  295. catch (Exception exception)
  296. {
  297. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  298. }
  299. return subject.AsObservable();
  300. };
  301. }
  302. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  303. {
  304. return (x, y, z, a, b, c, d, e, f) =>
  305. {
  306. var subject = new AsyncSubject<TResult>();
  307. try
  308. {
  309. begin(x, y, z, a, b, c, d, e, f, iar =>
  310. {
  311. // See remark on FromAsyncPattern<TResult>.
  312. TResult result;
  313. try
  314. {
  315. result = end(iar);
  316. }
  317. catch (Exception exception)
  318. {
  319. subject.OnError(exception);
  320. return;
  321. }
  322. subject.OnNext(result);
  323. subject.OnCompleted();
  324. }, null);
  325. }
  326. catch (Exception exception)
  327. {
  328. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  329. }
  330. return subject.AsObservable();
  331. };
  332. }
  333. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  334. {
  335. return (x, y, z, a, b, c, d, e, f, g) =>
  336. {
  337. var subject = new AsyncSubject<TResult>();
  338. try
  339. {
  340. begin(x, y, z, a, b, c, d, e, f, g, iar =>
  341. {
  342. // See remark on FromAsyncPattern<TResult>.
  343. TResult result;
  344. try
  345. {
  346. result = end(iar);
  347. }
  348. catch (Exception exception)
  349. {
  350. subject.OnError(exception);
  351. return;
  352. }
  353. subject.OnNext(result);
  354. subject.OnCompleted();
  355. }, null);
  356. }
  357. catch (Exception exception)
  358. {
  359. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  360. }
  361. return subject.AsObservable();
  362. };
  363. }
  364. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  365. {
  366. return (x, y, z, a, b, c, d, e, f, g, h) =>
  367. {
  368. var subject = new AsyncSubject<TResult>();
  369. try
  370. {
  371. begin(x, y, z, a, b, c, d, e, f, g, h, iar =>
  372. {
  373. // See remark on FromAsyncPattern<TResult>.
  374. TResult result;
  375. try
  376. {
  377. result = end(iar);
  378. }
  379. catch (Exception exception)
  380. {
  381. subject.OnError(exception);
  382. return;
  383. }
  384. subject.OnNext(result);
  385. subject.OnCompleted();
  386. }, null);
  387. }
  388. catch (Exception exception)
  389. {
  390. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  391. }
  392. return subject.AsObservable();
  393. };
  394. }
  395. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  396. {
  397. return (x, y, z, a, b, c, d, e, f, g, h, i) =>
  398. {
  399. var subject = new AsyncSubject<TResult>();
  400. try
  401. {
  402. begin(x, y, z, a, b, c, d, e, f, g, h, i, iar =>
  403. {
  404. // See remark on FromAsyncPattern<TResult>.
  405. TResult result;
  406. try
  407. {
  408. result = end(iar);
  409. }
  410. catch (Exception exception)
  411. {
  412. subject.OnError(exception);
  413. return;
  414. }
  415. subject.OnNext(result);
  416. subject.OnCompleted();
  417. }, null);
  418. }
  419. catch (Exception exception)
  420. {
  421. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  422. }
  423. return subject.AsObservable();
  424. };
  425. }
  426. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  427. {
  428. return (x, y, z, a, b, c, d, e, f, g, h, i, j) =>
  429. {
  430. var subject = new AsyncSubject<TResult>();
  431. try
  432. {
  433. begin(x, y, z, a, b, c, d, e, f, g, h, i, j, iar =>
  434. {
  435. // See remark on FromAsyncPattern<TResult>.
  436. TResult result;
  437. try
  438. {
  439. result = end(iar);
  440. }
  441. catch (Exception exception)
  442. {
  443. subject.OnError(exception);
  444. return;
  445. }
  446. subject.OnNext(result);
  447. subject.OnCompleted();
  448. }, null);
  449. }
  450. catch (Exception exception)
  451. {
  452. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  453. }
  454. return subject.AsObservable();
  455. };
  456. }
  457. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
  458. {
  459. return (x, y, z, a, b, c, d, e, f, g, h, i, j, k) =>
  460. {
  461. var subject = new AsyncSubject<TResult>();
  462. try
  463. {
  464. begin(x, y, z, a, b, c, d, e, f, g, h, i, j, k, iar =>
  465. {
  466. // See remark on FromAsyncPattern<TResult>.
  467. TResult result;
  468. try
  469. {
  470. result = end(iar);
  471. }
  472. catch (Exception exception)
  473. {
  474. subject.OnError(exception);
  475. return;
  476. }
  477. subject.OnNext(result);
  478. subject.OnCompleted();
  479. }, null);
  480. }
  481. catch (Exception exception)
  482. {
  483. return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
  484. }
  485. return subject.AsObservable();
  486. };
  487. }
  488. #endif
  489. #endregion
  490. #region Action
  491. public virtual Func<IObservable<Unit>> FromAsyncPattern(Func<AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  492. {
  493. return FromAsyncPattern(begin, iar =>
  494. {
  495. end(iar);
  496. return Unit.Default;
  497. });
  498. }
  499. public virtual Func<T1, IObservable<Unit>> FromAsyncPattern<T1>(Func<T1, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  500. {
  501. return FromAsyncPattern(begin, iar =>
  502. {
  503. end(iar);
  504. return Unit.Default;
  505. });
  506. }
  507. public virtual Func<T1, T2, IObservable<Unit>> FromAsyncPattern<T1, T2>(Func<T1, T2, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  508. {
  509. return FromAsyncPattern(begin, iar =>
  510. {
  511. end(iar);
  512. return Unit.Default;
  513. });
  514. }
  515. #if !NO_LARGEARITY
  516. public virtual Func<T1, T2, T3, IObservable<Unit>> FromAsyncPattern<T1, T2, T3>(Func<T1, T2, T3, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  517. {
  518. return FromAsyncPattern(begin, iar =>
  519. {
  520. end(iar);
  521. return Unit.Default;
  522. });
  523. }
  524. public virtual Func<T1, T2, T3, T4, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4>(Func<T1, T2, T3, T4, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  525. {
  526. return FromAsyncPattern(begin, iar =>
  527. {
  528. end(iar);
  529. return Unit.Default;
  530. });
  531. }
  532. public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5>(Func<T1, T2, T3, T4, T5, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  533. {
  534. return FromAsyncPattern(begin, iar =>
  535. {
  536. end(iar);
  537. return Unit.Default;
  538. });
  539. }
  540. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6>(Func<T1, T2, T3, T4, T5, T6, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  541. {
  542. return FromAsyncPattern(begin, iar =>
  543. {
  544. end(iar);
  545. return Unit.Default;
  546. });
  547. }
  548. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7>(Func<T1, T2, T3, T4, T5, T6, T7, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  549. {
  550. return FromAsyncPattern(begin, iar =>
  551. {
  552. end(iar);
  553. return Unit.Default;
  554. });
  555. }
  556. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8>(Func<T1, T2, T3, T4, T5, T6, T7, T8, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  557. {
  558. return FromAsyncPattern(begin, iar =>
  559. {
  560. end(iar);
  561. return Unit.Default;
  562. });
  563. }
  564. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  565. {
  566. return FromAsyncPattern(begin, iar =>
  567. {
  568. end(iar);
  569. return Unit.Default;
  570. });
  571. }
  572. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  573. {
  574. return FromAsyncPattern(begin, iar =>
  575. {
  576. end(iar);
  577. return Unit.Default;
  578. });
  579. }
  580. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  581. {
  582. return FromAsyncPattern(begin, iar =>
  583. {
  584. end(iar);
  585. return Unit.Default;
  586. });
  587. }
  588. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  589. {
  590. return FromAsyncPattern(begin, iar =>
  591. {
  592. end(iar);
  593. return Unit.Default;
  594. });
  595. }
  596. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  597. {
  598. return FromAsyncPattern(begin, iar =>
  599. {
  600. end(iar);
  601. return Unit.Default;
  602. });
  603. }
  604. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
  605. {
  606. return FromAsyncPattern(begin, iar =>
  607. {
  608. end(iar);
  609. return Unit.Default;
  610. });
  611. }
  612. #endif
  613. #endregion
  614. #endregion
  615. #region Start[Async]
  616. #region Func
  617. public virtual IObservable<TSource> Start<TSource>(Func<TSource> function)
  618. {
  619. return ToAsync(function)();
  620. }
  621. public virtual IObservable<TSource> Start<TSource>(Func<TSource> function, IScheduler scheduler)
  622. {
  623. return ToAsync(function, scheduler)();
  624. }
  625. #if !NO_TPL
  626. public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync)
  627. {
  628. return StartAsyncImpl(functionAsync, null);
  629. }
  630. public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync, IScheduler scheduler)
  631. {
  632. return StartAsyncImpl(functionAsync, scheduler);
  633. }
  634. private IObservable<TSource> StartAsyncImpl<TSource>(Func<Task<TSource>> functionAsync, IScheduler scheduler)
  635. {
  636. var task = default(Task<TSource>);
  637. try
  638. {
  639. task = functionAsync();
  640. }
  641. catch (Exception exception)
  642. {
  643. return Throw<TSource>(exception);
  644. }
  645. if (scheduler != null)
  646. {
  647. return task.ToObservable(scheduler);
  648. }
  649. else
  650. {
  651. return task.ToObservable();
  652. }
  653. }
  654. public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync)
  655. {
  656. return StartAsyncImpl(functionAsync, null);
  657. }
  658. public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler scheduler)
  659. {
  660. return StartAsyncImpl(functionAsync, scheduler);
  661. }
  662. private IObservable<TSource> StartAsyncImpl<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler scheduler)
  663. {
  664. var cancellable = new CancellationDisposable();
  665. var task = default(Task<TSource>);
  666. try
  667. {
  668. task = functionAsync(cancellable.Token);
  669. }
  670. catch (Exception exception)
  671. {
  672. return Throw<TSource>(exception);
  673. }
  674. var result = default(IObservable<TSource>);
  675. if (scheduler != null)
  676. {
  677. result = task.ToObservable(scheduler);
  678. }
  679. else
  680. {
  681. result = task.ToObservable();
  682. }
  683. return new AnonymousObservable<TSource>(observer =>
  684. {
  685. //
  686. // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
  687. //
  688. var subscription = result.Subscribe/*Unsafe*/(observer);
  689. return StableCompositeDisposable.Create(cancellable, subscription);
  690. });
  691. }
  692. #endif
  693. #endregion
  694. #region Action
  695. public virtual IObservable<Unit> Start(Action action)
  696. {
  697. return ToAsync(action, SchedulerDefaults.AsyncConversions)();
  698. }
  699. public virtual IObservable<Unit> Start(Action action, IScheduler scheduler)
  700. {
  701. return ToAsync(action, scheduler)();
  702. }
  703. #if !NO_TPL
  704. public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync)
  705. {
  706. return StartAsyncImpl(actionAsync, null);
  707. }
  708. public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync, IScheduler scheduler)
  709. {
  710. return StartAsyncImpl(actionAsync, scheduler);
  711. }
  712. private IObservable<Unit> StartAsyncImpl(Func<Task> actionAsync, IScheduler scheduler)
  713. {
  714. var task = default(Task);
  715. try
  716. {
  717. task = actionAsync();
  718. }
  719. catch (Exception exception)
  720. {
  721. return Throw<Unit>(exception);
  722. }
  723. if (scheduler != null)
  724. {
  725. return task.ToObservable(scheduler);
  726. }
  727. else
  728. {
  729. return task.ToObservable();
  730. }
  731. }
  732. public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync)
  733. {
  734. return StartAsyncImpl(actionAsync, null);
  735. }
  736. public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
  737. {
  738. return StartAsyncImpl(actionAsync, scheduler);
  739. }
  740. private IObservable<Unit> StartAsyncImpl(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
  741. {
  742. var cancellable = new CancellationDisposable();
  743. var task = default(Task);
  744. try
  745. {
  746. task = actionAsync(cancellable.Token);
  747. }
  748. catch (Exception exception)
  749. {
  750. return Throw<Unit>(exception);
  751. }
  752. var result = default(IObservable<Unit>);
  753. if (scheduler != null)
  754. {
  755. result = task.ToObservable(scheduler);
  756. }
  757. else
  758. {
  759. result = task.ToObservable();
  760. }
  761. return new AnonymousObservable<Unit>(observer =>
  762. {
  763. //
  764. // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
  765. //
  766. var subscription = result.Subscribe/*Unsafe*/(observer);
  767. return StableCompositeDisposable.Create(cancellable, subscription);
  768. });
  769. }
  770. #endif
  771. #endregion
  772. #endregion
  773. #region FromAsync
  774. #if !NO_TPL
  775. #region Func
  776. public virtual IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync)
  777. {
  778. return Defer(() => StartAsync(functionAsync));
  779. }
  780. public virtual IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync)
  781. {
  782. return Defer(() => StartAsync(functionAsync));
  783. }
  784. public virtual IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, IScheduler scheduler)
  785. {
  786. return Defer(() => StartAsync(functionAsync, scheduler));
  787. }
  788. public virtual IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, IScheduler scheduler)
  789. {
  790. return Defer(() => StartAsync(functionAsync, scheduler));
  791. }
  792. #endregion
  793. #region Action
  794. public virtual IObservable<Unit> FromAsync(Func<Task> actionAsync)
  795. {
  796. return Defer(() => StartAsync(actionAsync));
  797. }
  798. public virtual IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync)
  799. {
  800. return Defer(() => StartAsync(actionAsync));
  801. }
  802. public virtual IObservable<Unit> FromAsync(Func<Task> actionAsync, IScheduler scheduler)
  803. {
  804. return Defer(() => StartAsync(actionAsync, scheduler));
  805. }
  806. public virtual IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
  807. {
  808. return Defer(() => StartAsync(actionAsync, scheduler));
  809. }
  810. #endregion
  811. #endif
  812. #endregion
  813. #region ToAsync
  814. #region Func
  815. public virtual Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function)
  816. {
  817. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  818. }
  819. public virtual Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function, IScheduler scheduler)
  820. {
  821. return () =>
  822. {
  823. var subject = new AsyncSubject<TResult>();
  824. scheduler.Schedule(() =>
  825. {
  826. var result = default(TResult);
  827. try
  828. {
  829. result = function();
  830. }
  831. catch (Exception exception)
  832. {
  833. subject.OnError(exception);
  834. return;
  835. }
  836. subject.OnNext(result);
  837. subject.OnCompleted();
  838. });
  839. return subject.AsObservable();
  840. };
  841. }
  842. public virtual Func<T, IObservable<TResult>> ToAsync<T, TResult>(Func<T, TResult> function)
  843. {
  844. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  845. }
  846. public virtual Func<T, IObservable<TResult>> ToAsync<T, TResult>(Func<T, TResult> function, IScheduler scheduler)
  847. {
  848. return (first) =>
  849. {
  850. var subject = new AsyncSubject<TResult>();
  851. scheduler.Schedule(() =>
  852. {
  853. var result = default(TResult);
  854. try
  855. {
  856. result = function(first);
  857. }
  858. catch (Exception exception)
  859. {
  860. subject.OnError(exception);
  861. return;
  862. }
  863. subject.OnNext(result);
  864. subject.OnCompleted();
  865. });
  866. return subject.AsObservable();
  867. };
  868. }
  869. public virtual Func<T1, T2, IObservable<TResult>> ToAsync<T1, T2, TResult>(Func<T1, T2, TResult> function)
  870. {
  871. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  872. }
  873. public virtual Func<T1, T2, IObservable<TResult>> ToAsync<T1, T2, TResult>(Func<T1, T2, TResult> function, IScheduler scheduler)
  874. {
  875. return (first, second) =>
  876. {
  877. var subject = new AsyncSubject<TResult>();
  878. scheduler.Schedule(() =>
  879. {
  880. var result = default(TResult);
  881. try
  882. {
  883. result = function(first, second);
  884. }
  885. catch (Exception exception)
  886. {
  887. subject.OnError(exception);
  888. return;
  889. }
  890. subject.OnNext(result);
  891. subject.OnCompleted();
  892. });
  893. return subject.AsObservable();
  894. };
  895. }
  896. public virtual Func<T1, T2, T3, IObservable<TResult>> ToAsync<T1, T2, T3, TResult>(Func<T1, T2, T3, TResult> function)
  897. {
  898. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  899. }
  900. public virtual Func<T1, T2, T3, IObservable<TResult>> ToAsync<T1, T2, T3, TResult>(Func<T1, T2, T3, TResult> function, IScheduler scheduler)
  901. {
  902. return (first, second, third) =>
  903. {
  904. var subject = new AsyncSubject<TResult>();
  905. scheduler.Schedule(() =>
  906. {
  907. var result = default(TResult);
  908. try
  909. {
  910. result = function(first, second, third);
  911. }
  912. catch (Exception exception)
  913. {
  914. subject.OnError(exception);
  915. return;
  916. }
  917. subject.OnNext(result);
  918. subject.OnCompleted();
  919. });
  920. return subject.AsObservable();
  921. };
  922. }
  923. public virtual Func<T1, T2, T3, T4, IObservable<TResult>> ToAsync<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, TResult> function)
  924. {
  925. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  926. }
  927. public virtual Func<T1, T2, T3, T4, IObservable<TResult>> ToAsync<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, TResult> function, IScheduler scheduler)
  928. {
  929. return (first, second, third, fourth) =>
  930. {
  931. var subject = new AsyncSubject<TResult>();
  932. scheduler.Schedule(() =>
  933. {
  934. var result = default(TResult);
  935. try
  936. {
  937. result = function(first, second, third, fourth);
  938. }
  939. catch (Exception exception)
  940. {
  941. subject.OnError(exception);
  942. return;
  943. }
  944. subject.OnNext(result);
  945. subject.OnCompleted();
  946. });
  947. return subject.AsObservable();
  948. };
  949. }
  950. #if !NO_LARGEARITY
  951. public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, TResult> function)
  952. {
  953. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  954. }
  955. public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, TResult> function, IScheduler scheduler)
  956. {
  957. return (first, second, third, fourth, fifth) =>
  958. {
  959. var subject = new AsyncSubject<TResult>();
  960. scheduler.Schedule(() =>
  961. {
  962. var result = default(TResult);
  963. try
  964. {
  965. result = function(first, second, third, fourth, fifth);
  966. }
  967. catch (Exception exception)
  968. {
  969. subject.OnError(exception);
  970. return;
  971. }
  972. subject.OnNext(result);
  973. subject.OnCompleted();
  974. });
  975. return subject.AsObservable();
  976. };
  977. }
  978. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, TResult> function)
  979. {
  980. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  981. }
  982. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, TResult> function, IScheduler scheduler)
  983. {
  984. return (first, second, third, fourth, fifth, sixth) =>
  985. {
  986. var subject = new AsyncSubject<TResult>();
  987. scheduler.Schedule(() =>
  988. {
  989. var result = default(TResult);
  990. try
  991. {
  992. result = function(first, second, third, fourth, fifth, sixth);
  993. }
  994. catch (Exception exception)
  995. {
  996. subject.OnError(exception);
  997. return;
  998. }
  999. subject.OnNext(result);
  1000. subject.OnCompleted();
  1001. });
  1002. return subject.AsObservable();
  1003. };
  1004. }
  1005. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, TResult> function)
  1006. {
  1007. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1008. }
  1009. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, TResult> function, IScheduler scheduler)
  1010. {
  1011. return (first, second, third, fourth, fifth, sixth, seventh) =>
  1012. {
  1013. var subject = new AsyncSubject<TResult>();
  1014. scheduler.Schedule(() =>
  1015. {
  1016. var result = default(TResult);
  1017. try
  1018. {
  1019. result = function(first, second, third, fourth, fifth, sixth, seventh);
  1020. }
  1021. catch (Exception exception)
  1022. {
  1023. subject.OnError(exception);
  1024. return;
  1025. }
  1026. subject.OnNext(result);
  1027. subject.OnCompleted();
  1028. });
  1029. return subject.AsObservable();
  1030. };
  1031. }
  1032. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function)
  1033. {
  1034. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1035. }
  1036. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function, IScheduler scheduler)
  1037. {
  1038. return (first, second, third, fourth, fifth, sixth, seventh, eight) =>
  1039. {
  1040. var subject = new AsyncSubject<TResult>();
  1041. scheduler.Schedule(() =>
  1042. {
  1043. var result = default(TResult);
  1044. try
  1045. {
  1046. result = function(first, second, third, fourth, fifth, sixth, seventh, eight);
  1047. }
  1048. catch (Exception exception)
  1049. {
  1050. subject.OnError(exception);
  1051. return;
  1052. }
  1053. subject.OnNext(result);
  1054. subject.OnCompleted();
  1055. });
  1056. return subject.AsObservable();
  1057. };
  1058. }
  1059. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function)
  1060. {
  1061. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1062. }
  1063. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function, IScheduler scheduler)
  1064. {
  1065. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth) =>
  1066. {
  1067. var subject = new AsyncSubject<TResult>();
  1068. scheduler.Schedule(() =>
  1069. {
  1070. var result = default(TResult);
  1071. try
  1072. {
  1073. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth);
  1074. }
  1075. catch (Exception exception)
  1076. {
  1077. subject.OnError(exception);
  1078. return;
  1079. }
  1080. subject.OnNext(result);
  1081. subject.OnCompleted();
  1082. });
  1083. return subject.AsObservable();
  1084. };
  1085. }
  1086. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function)
  1087. {
  1088. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1089. }
  1090. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function, IScheduler scheduler)
  1091. {
  1092. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth) =>
  1093. {
  1094. var subject = new AsyncSubject<TResult>();
  1095. scheduler.Schedule(() =>
  1096. {
  1097. var result = default(TResult);
  1098. try
  1099. {
  1100. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth);
  1101. }
  1102. catch (Exception exception)
  1103. {
  1104. subject.OnError(exception);
  1105. return;
  1106. }
  1107. subject.OnNext(result);
  1108. subject.OnCompleted();
  1109. });
  1110. return subject.AsObservable();
  1111. };
  1112. }
  1113. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function)
  1114. {
  1115. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1116. }
  1117. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function, IScheduler scheduler)
  1118. {
  1119. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh) =>
  1120. {
  1121. var subject = new AsyncSubject<TResult>();
  1122. scheduler.Schedule(() =>
  1123. {
  1124. var result = default(TResult);
  1125. try
  1126. {
  1127. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh);
  1128. }
  1129. catch (Exception exception)
  1130. {
  1131. subject.OnError(exception);
  1132. return;
  1133. }
  1134. subject.OnNext(result);
  1135. subject.OnCompleted();
  1136. });
  1137. return subject.AsObservable();
  1138. };
  1139. }
  1140. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function)
  1141. {
  1142. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1143. }
  1144. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function, IScheduler scheduler)
  1145. {
  1146. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth) =>
  1147. {
  1148. var subject = new AsyncSubject<TResult>();
  1149. scheduler.Schedule(() =>
  1150. {
  1151. var result = default(TResult);
  1152. try
  1153. {
  1154. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth);
  1155. }
  1156. catch (Exception exception)
  1157. {
  1158. subject.OnError(exception);
  1159. return;
  1160. }
  1161. subject.OnNext(result);
  1162. subject.OnCompleted();
  1163. });
  1164. return subject.AsObservable();
  1165. };
  1166. }
  1167. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function)
  1168. {
  1169. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1170. }
  1171. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function, IScheduler scheduler)
  1172. {
  1173. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth) =>
  1174. {
  1175. var subject = new AsyncSubject<TResult>();
  1176. scheduler.Schedule(() =>
  1177. {
  1178. var result = default(TResult);
  1179. try
  1180. {
  1181. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth);
  1182. }
  1183. catch (Exception exception)
  1184. {
  1185. subject.OnError(exception);
  1186. return;
  1187. }
  1188. subject.OnNext(result);
  1189. subject.OnCompleted();
  1190. });
  1191. return subject.AsObservable();
  1192. };
  1193. }
  1194. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function)
  1195. {
  1196. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1197. }
  1198. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function, IScheduler scheduler)
  1199. {
  1200. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) =>
  1201. {
  1202. var subject = new AsyncSubject<TResult>();
  1203. scheduler.Schedule(() =>
  1204. {
  1205. var result = default(TResult);
  1206. try
  1207. {
  1208. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth);
  1209. }
  1210. catch (Exception exception)
  1211. {
  1212. subject.OnError(exception);
  1213. return;
  1214. }
  1215. subject.OnNext(result);
  1216. subject.OnCompleted();
  1217. });
  1218. return subject.AsObservable();
  1219. };
  1220. }
  1221. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function)
  1222. {
  1223. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1224. }
  1225. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function, IScheduler scheduler)
  1226. {
  1227. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) =>
  1228. {
  1229. var subject = new AsyncSubject<TResult>();
  1230. scheduler.Schedule(() =>
  1231. {
  1232. var result = default(TResult);
  1233. try
  1234. {
  1235. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth);
  1236. }
  1237. catch (Exception exception)
  1238. {
  1239. subject.OnError(exception);
  1240. return;
  1241. }
  1242. subject.OnNext(result);
  1243. subject.OnCompleted();
  1244. });
  1245. return subject.AsObservable();
  1246. };
  1247. }
  1248. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function)
  1249. {
  1250. return ToAsync(function, SchedulerDefaults.AsyncConversions);
  1251. }
  1252. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function, IScheduler scheduler)
  1253. {
  1254. return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) =>
  1255. {
  1256. var subject = new AsyncSubject<TResult>();
  1257. scheduler.Schedule(() =>
  1258. {
  1259. var result = default(TResult);
  1260. try
  1261. {
  1262. result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth);
  1263. }
  1264. catch (Exception exception)
  1265. {
  1266. subject.OnError(exception);
  1267. return;
  1268. }
  1269. subject.OnNext(result);
  1270. subject.OnCompleted();
  1271. });
  1272. return subject.AsObservable();
  1273. };
  1274. }
  1275. #endif
  1276. #endregion
  1277. #region Action
  1278. public virtual Func<IObservable<Unit>> ToAsync(Action action)
  1279. {
  1280. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1281. }
  1282. public virtual Func<IObservable<Unit>> ToAsync(Action action, IScheduler scheduler)
  1283. {
  1284. return () =>
  1285. {
  1286. var subject = new AsyncSubject<Unit>();
  1287. scheduler.Schedule(() =>
  1288. {
  1289. try
  1290. {
  1291. action();
  1292. }
  1293. catch (Exception exception)
  1294. {
  1295. subject.OnError(exception);
  1296. return;
  1297. }
  1298. subject.OnNext(Unit.Default);
  1299. subject.OnCompleted();
  1300. });
  1301. return subject.AsObservable();
  1302. };
  1303. }
  1304. public virtual Func<TSource, IObservable<Unit>> ToAsync<TSource>(Action<TSource> action)
  1305. {
  1306. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1307. }
  1308. public virtual Func<TSource, IObservable<Unit>> ToAsync<TSource>(Action<TSource> action, IScheduler scheduler)
  1309. {
  1310. return (first) =>
  1311. {
  1312. var subject = new AsyncSubject<Unit>();
  1313. scheduler.Schedule(() =>
  1314. {
  1315. try
  1316. {
  1317. action(first);
  1318. }
  1319. catch (Exception exception)
  1320. {
  1321. subject.OnError(exception);
  1322. return;
  1323. }
  1324. subject.OnNext(Unit.Default);
  1325. subject.OnCompleted();
  1326. });
  1327. return subject.AsObservable();
  1328. };
  1329. }
  1330. public virtual Func<T1, T2, IObservable<Unit>> ToAsync<T1, T2>(Action<T1, T2> action)
  1331. {
  1332. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1333. }
  1334. public virtual Func<T1, T2, IObservable<Unit>> ToAsync<T1, T2>(Action<T1, T2> action, IScheduler scheduler)
  1335. {
  1336. return (first, second) =>
  1337. {
  1338. var subject = new AsyncSubject<Unit>();
  1339. scheduler.Schedule(() =>
  1340. {
  1341. try
  1342. {
  1343. action(first, second);
  1344. }
  1345. catch (Exception exception)
  1346. {
  1347. subject.OnError(exception);
  1348. return;
  1349. }
  1350. subject.OnNext(Unit.Default);
  1351. subject.OnCompleted();
  1352. });
  1353. return subject.AsObservable();
  1354. };
  1355. }
  1356. public virtual Func<T1, T2, T3, IObservable<Unit>> ToAsync<T1, T2, T3>(Action<T1, T2, T3> action)
  1357. {
  1358. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1359. }
  1360. public virtual Func<T1, T2, T3, IObservable<Unit>> ToAsync<T1, T2, T3>(Action<T1, T2, T3> action, IScheduler scheduler)
  1361. {
  1362. return (first, second, third) =>
  1363. {
  1364. var subject = new AsyncSubject<Unit>();
  1365. scheduler.Schedule(() =>
  1366. {
  1367. try
  1368. {
  1369. action(first, second, third);
  1370. }
  1371. catch (Exception exception)
  1372. {
  1373. subject.OnError(exception);
  1374. return;
  1375. }
  1376. subject.OnNext(Unit.Default);
  1377. subject.OnCompleted();
  1378. });
  1379. return subject.AsObservable();
  1380. };
  1381. }
  1382. public virtual Func<T1, T2, T3, T4, IObservable<Unit>> ToAsync<T1, T2, T3, T4>(Action<T1, T2, T3, T4> action)
  1383. {
  1384. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1385. }
  1386. public virtual Func<T1, T2, T3, T4, IObservable<Unit>> ToAsync<T1, T2, T3, T4>(Action<T1, T2, T3, T4> action, IScheduler scheduler)
  1387. {
  1388. return (first, second, third, fourth) =>
  1389. {
  1390. var subject = new AsyncSubject<Unit>();
  1391. scheduler.Schedule(() =>
  1392. {
  1393. try
  1394. {
  1395. action(first, second, third, fourth);
  1396. }
  1397. catch (Exception exception)
  1398. {
  1399. subject.OnError(exception);
  1400. return;
  1401. }
  1402. subject.OnNext(Unit.Default);
  1403. subject.OnCompleted();
  1404. });
  1405. return subject.AsObservable();
  1406. };
  1407. }
  1408. #if !NO_LARGEARITY
  1409. public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5>(Action<T1, T2, T3, T4, T5> action)
  1410. {
  1411. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1412. }
  1413. public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5>(Action<T1, T2, T3, T4, T5> action, IScheduler scheduler)
  1414. {
  1415. return (first, second, third, fourth, fifth) =>
  1416. {
  1417. var subject = new AsyncSubject<Unit>();
  1418. scheduler.Schedule(() =>
  1419. {
  1420. try
  1421. {
  1422. action(first, second, third, fourth, fifth);
  1423. }
  1424. catch (Exception exception)
  1425. {
  1426. subject.OnError(exception);
  1427. return;
  1428. }
  1429. subject.OnNext(Unit.Default);
  1430. subject.OnCompleted();
  1431. });
  1432. return subject.AsObservable();
  1433. };
  1434. }
  1435. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6>(Action<T1, T2, T3, T4, T5, T6> action)
  1436. {
  1437. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1438. }
  1439. public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6>(Action<T1, T2, T3, T4, T5, T6> action, IScheduler scheduler)
  1440. {
  1441. return (first, second, third, fourth, fifth, sixth) =>
  1442. {
  1443. var subject = new AsyncSubject<Unit>();
  1444. scheduler.Schedule(() =>
  1445. {
  1446. try
  1447. {
  1448. action(first, second, third, fourth, fifth, sixth);
  1449. }
  1450. catch (Exception exception)
  1451. {
  1452. subject.OnError(exception);
  1453. return;
  1454. }
  1455. subject.OnNext(Unit.Default);
  1456. subject.OnCompleted();
  1457. });
  1458. return subject.AsObservable();
  1459. };
  1460. }
  1461. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7>(Action<T1, T2, T3, T4, T5, T6, T7> action)
  1462. {
  1463. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1464. }
  1465. public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7>(Action<T1, T2, T3, T4, T5, T6, T7> action, IScheduler scheduler)
  1466. {
  1467. return (first, second, third, fourth, fifth, sixth, seventh) =>
  1468. {
  1469. var subject = new AsyncSubject<Unit>();
  1470. scheduler.Schedule(() =>
  1471. {
  1472. try
  1473. {
  1474. action(first, second, third, fourth, fifth, sixth, seventh);
  1475. }
  1476. catch (Exception exception)
  1477. {
  1478. subject.OnError(exception);
  1479. return;
  1480. }
  1481. subject.OnNext(Unit.Default);
  1482. subject.OnCompleted();
  1483. });
  1484. return subject.AsObservable();
  1485. };
  1486. }
  1487. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(Action<T1, T2, T3, T4, T5, T6, T7, T8> action)
  1488. {
  1489. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1490. }
  1491. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(Action<T1, T2, T3, T4, T5, T6, T7, T8> action, IScheduler scheduler)
  1492. {
  1493. return (first, second, third, fourth, fifth, sixth, seventh, eight) =>
  1494. {
  1495. var subject = new AsyncSubject<Unit>();
  1496. scheduler.Schedule(() =>
  1497. {
  1498. try
  1499. {
  1500. action(first, second, third, fourth, fifth, sixth, seventh, eight);
  1501. }
  1502. catch (Exception exception)
  1503. {
  1504. subject.OnError(exception);
  1505. return;
  1506. }
  1507. subject.OnNext(Unit.Default);
  1508. subject.OnCompleted();
  1509. });
  1510. return subject.AsObservable();
  1511. };
  1512. }
  1513. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action)
  1514. {
  1515. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1516. }
  1517. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action, IScheduler scheduler)
  1518. {
  1519. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth) =>
  1520. {
  1521. var subject = new AsyncSubject<Unit>();
  1522. scheduler.Schedule(() =>
  1523. {
  1524. try
  1525. {
  1526. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth);
  1527. }
  1528. catch (Exception exception)
  1529. {
  1530. subject.OnError(exception);
  1531. return;
  1532. }
  1533. subject.OnNext(Unit.Default);
  1534. subject.OnCompleted();
  1535. });
  1536. return subject.AsObservable();
  1537. };
  1538. }
  1539. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action)
  1540. {
  1541. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1542. }
  1543. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action, IScheduler scheduler)
  1544. {
  1545. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth) =>
  1546. {
  1547. var subject = new AsyncSubject<Unit>();
  1548. scheduler.Schedule(() =>
  1549. {
  1550. try
  1551. {
  1552. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth);
  1553. }
  1554. catch (Exception exception)
  1555. {
  1556. subject.OnError(exception);
  1557. return;
  1558. }
  1559. subject.OnNext(Unit.Default);
  1560. subject.OnCompleted();
  1561. });
  1562. return subject.AsObservable();
  1563. };
  1564. }
  1565. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action)
  1566. {
  1567. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1568. }
  1569. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action, IScheduler scheduler)
  1570. {
  1571. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh) =>
  1572. {
  1573. var subject = new AsyncSubject<Unit>();
  1574. scheduler.Schedule(() =>
  1575. {
  1576. try
  1577. {
  1578. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh);
  1579. }
  1580. catch (Exception exception)
  1581. {
  1582. subject.OnError(exception);
  1583. return;
  1584. }
  1585. subject.OnNext(Unit.Default);
  1586. subject.OnCompleted();
  1587. });
  1588. return subject.AsObservable();
  1589. };
  1590. }
  1591. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action)
  1592. {
  1593. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1594. }
  1595. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action, IScheduler scheduler)
  1596. {
  1597. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth) =>
  1598. {
  1599. var subject = new AsyncSubject<Unit>();
  1600. scheduler.Schedule(() =>
  1601. {
  1602. try
  1603. {
  1604. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth);
  1605. }
  1606. catch (Exception exception)
  1607. {
  1608. subject.OnError(exception);
  1609. return;
  1610. }
  1611. subject.OnNext(Unit.Default);
  1612. subject.OnCompleted();
  1613. });
  1614. return subject.AsObservable();
  1615. };
  1616. }
  1617. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action)
  1618. {
  1619. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1620. }
  1621. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action, IScheduler scheduler)
  1622. {
  1623. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth) =>
  1624. {
  1625. var subject = new AsyncSubject<Unit>();
  1626. scheduler.Schedule(() =>
  1627. {
  1628. try
  1629. {
  1630. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth);
  1631. }
  1632. catch (Exception exception)
  1633. {
  1634. subject.OnError(exception);
  1635. return;
  1636. }
  1637. subject.OnNext(Unit.Default);
  1638. subject.OnCompleted();
  1639. });
  1640. return subject.AsObservable();
  1641. };
  1642. }
  1643. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action)
  1644. {
  1645. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1646. }
  1647. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action, IScheduler scheduler)
  1648. {
  1649. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) =>
  1650. {
  1651. var subject = new AsyncSubject<Unit>();
  1652. scheduler.Schedule(() =>
  1653. {
  1654. try
  1655. {
  1656. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth);
  1657. }
  1658. catch (Exception exception)
  1659. {
  1660. subject.OnError(exception);
  1661. return;
  1662. }
  1663. subject.OnNext(Unit.Default);
  1664. subject.OnCompleted();
  1665. });
  1666. return subject.AsObservable();
  1667. };
  1668. }
  1669. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action)
  1670. {
  1671. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1672. }
  1673. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action, IScheduler scheduler)
  1674. {
  1675. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) =>
  1676. {
  1677. var subject = new AsyncSubject<Unit>();
  1678. scheduler.Schedule(() =>
  1679. {
  1680. try
  1681. {
  1682. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth);
  1683. }
  1684. catch (Exception exception)
  1685. {
  1686. subject.OnError(exception);
  1687. return;
  1688. }
  1689. subject.OnNext(Unit.Default);
  1690. subject.OnCompleted();
  1691. });
  1692. return subject.AsObservable();
  1693. };
  1694. }
  1695. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action)
  1696. {
  1697. return ToAsync(action, SchedulerDefaults.AsyncConversions);
  1698. }
  1699. public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action, IScheduler scheduler)
  1700. {
  1701. return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) =>
  1702. {
  1703. var subject = new AsyncSubject<Unit>();
  1704. scheduler.Schedule(() =>
  1705. {
  1706. try
  1707. {
  1708. action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth);
  1709. }
  1710. catch (Exception exception)
  1711. {
  1712. subject.OnError(exception);
  1713. return;
  1714. }
  1715. subject.OnNext(Unit.Default);
  1716. subject.OnCompleted();
  1717. });
  1718. return subject.AsObservable();
  1719. };
  1720. }
  1721. #endif
  1722. #endregion
  1723. #endregion
  1724. }
  1725. }