// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { internal partial class QueryLanguage { #region FromAsyncPattern #region Func public virtual Func> FromAsyncPattern(Func begin, Func end) { return () => { var subject = new AsyncSubject(); try { begin(iar => { // Note: Even if the callback completes synchronously, outgoing On* calls // cannot throw in user code since there can't be any subscribers // to the AsyncSubject yet. Therefore, there is no need to protect // against exceptions that'd be caught below and sent (incorrectly) // into the Observable.Throw sequence being constructed. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return x => { var subject = new AsyncSubject(); try { begin(x, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y) => { var subject = new AsyncSubject(); try { begin(x, y, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z) => { var subject = new AsyncSubject(); try { begin(x, y, z, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b, c) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, c, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b, c, d) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, c, d, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b, c, d, e) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, c, d, e, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b, c, d, e, f) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, c, d, e, f, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b, c, d, e, f, g) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, c, d, e, f, g, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b, c, d, e, f, g, h) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, c, d, e, f, g, h, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b, c, d, e, f, g, h, i) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, c, d, e, f, g, h, i, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b, c, d, e, f, g, h, i, j) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, c, d, e, f, g, h, i, j, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } public virtual Func> FromAsyncPattern(Func begin, Func end) { return (x, y, z, a, b, c, d, e, f, g, h, i, j, k) => { var subject = new AsyncSubject(); try { begin(x, y, z, a, b, c, d, e, f, g, h, i, j, k, iar => { // See remark on FromAsyncPattern. TResult result; try { result = end(iar); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }, null); } catch (Exception exception) { return Observable.Throw(exception, SchedulerDefaults.AsyncConversions); } return subject.AsObservable(); }; } #endregion #region Action public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } public virtual Func> FromAsyncPattern(Func begin, Action end) { return FromAsyncPattern(begin, iar => { end(iar); return Unit.Default; }); } #endregion #endregion #region Start[Async] #region Func public virtual IObservable Start(Func function) { return ToAsync(function)(); } public virtual IObservable Start(Func function, IScheduler scheduler) { return ToAsync(function, scheduler)(); } public virtual IObservable StartAsync(Func> functionAsync) { return StartAsyncImpl(functionAsync, null); } public virtual IObservable StartAsync(Func> functionAsync, IScheduler scheduler) { return StartAsyncImpl(functionAsync, scheduler); } private IObservable StartAsyncImpl(Func> functionAsync, IScheduler scheduler) { var task = default(Task); try { task = functionAsync(); } catch (Exception exception) { return Throw(exception); } if (scheduler != null) { return task.ToObservable(scheduler); } return task.ToObservable(); } public virtual IObservable StartAsync(Func> functionAsync) { return StartAsyncImpl(functionAsync, null); } public virtual IObservable StartAsync(Func> functionAsync, IScheduler scheduler) { return StartAsyncImpl(functionAsync, scheduler); } private IObservable StartAsyncImpl(Func> functionAsync, IScheduler scheduler) { var cancellable = new CancellationDisposable(); var task = default(Task); try { task = functionAsync(cancellable.Token); } catch (Exception exception) { return Throw(exception); } var result = default(IObservable); if (scheduler != null) { result = task.ToObservable(scheduler); } else { result = task.ToObservable(); } return new StartAsyncObservable(cancellable, result); } private sealed class StartAsyncObservable : ObservableBase { private readonly CancellationDisposable _cancellable; private readonly IObservable _result; public StartAsyncObservable(CancellationDisposable cancellable, IObservable result) { _cancellable = cancellable; _result = result; } protected override IDisposable SubscribeCore(IObserver observer) { // // [OK] Use of unsafe Subscribe: result is an AsyncSubject. // var subscription = _result.Subscribe/*Unsafe*/(observer); return StableCompositeDisposable.Create(_cancellable, subscription); } } #endregion #region Action public virtual IObservable Start(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions)(); } public virtual IObservable Start(Action action, IScheduler scheduler) { return ToAsync(action, scheduler)(); } public virtual IObservable StartAsync(Func actionAsync) { return StartAsyncImpl(actionAsync, null); } public virtual IObservable StartAsync(Func actionAsync, IScheduler scheduler) { return StartAsyncImpl(actionAsync, scheduler); } private IObservable StartAsyncImpl(Func actionAsync, IScheduler scheduler) { var task = default(Task); try { task = actionAsync(); } catch (Exception exception) { return Throw(exception); } if (scheduler != null) { return task.ToObservable(scheduler); } return task.ToObservable(); } public virtual IObservable StartAsync(Func actionAsync) { return StartAsyncImpl(actionAsync, null); } public virtual IObservable StartAsync(Func actionAsync, IScheduler scheduler) { return StartAsyncImpl(actionAsync, scheduler); } private IObservable StartAsyncImpl(Func actionAsync, IScheduler scheduler) { var cancellable = new CancellationDisposable(); var task = default(Task); try { task = actionAsync(cancellable.Token); } catch (Exception exception) { return Throw(exception); } var result = default(IObservable); if (scheduler != null) { result = task.ToObservable(scheduler); } else { result = task.ToObservable(); } return new StartAsyncObservable(cancellable, result); } #endregion #endregion #region FromAsync #region Func public virtual IObservable FromAsync(Func> functionAsync) { return Defer(() => StartAsync(functionAsync)); } public virtual IObservable FromAsync(Func> functionAsync) { return Defer(() => StartAsync(functionAsync)); } public virtual IObservable FromAsync(Func> functionAsync, IScheduler scheduler) { return Defer(() => StartAsync(functionAsync, scheduler)); } public virtual IObservable FromAsync(Func> functionAsync, IScheduler scheduler) { return Defer(() => StartAsync(functionAsync, scheduler)); } #endregion #region Action public virtual IObservable FromAsync(Func actionAsync) { return Defer(() => StartAsync(actionAsync)); } public virtual IObservable FromAsync(Func actionAsync) { return Defer(() => StartAsync(actionAsync)); } public virtual IObservable FromAsync(Func actionAsync, IScheduler scheduler) { return Defer(() => StartAsync(actionAsync, scheduler)); } public virtual IObservable FromAsync(Func actionAsync, IScheduler scheduler) { return Defer(() => StartAsync(actionAsync, scheduler)); } #endregion #endregion #region ToAsync #region Func public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return () => { var subject = new AsyncSubject(); scheduler.ScheduleAction((function, subject), state => { var result = default(TResult); try { result = state.function(); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return first => { var subject = new AsyncSubject(); scheduler.ScheduleAction((function, subject, first), state => { var result = default(TResult); try { result = state.function(state.first); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second), state => { var result = default(TResult); try { result = state.function(state.first, state.second); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eight) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth, state.fifteenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Func function) { return ToAsync(function, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Func function, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth), state => { var result = default(TResult); try { result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth, state.fifteenth, state.sixteenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(result); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } #endregion #region Action public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return () => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action), state => { try { state.action(); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return first => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first), state => { try { state.action(state.first); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second), state => { try { state.action(state.first, state.second); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third), state => { try { state.action(state.first, state.second, state.third); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth), state => { try { state.action(state.first, state.second, state.third, state.fourth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eighth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth, state.fifteenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } public virtual Func> ToAsync(Action action) { return ToAsync(action, SchedulerDefaults.AsyncConversions); } public virtual Func> ToAsync(Action action, IScheduler scheduler) { return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) => { var subject = new AsyncSubject(); scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth), state => { try { state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth, state.fifteenth, state.sixteenth); } catch (Exception exception) { state.subject.OnError(exception); return; } state.subject.OnNext(Unit.Default); state.subject.OnCompleted(); }); return subject.AsObservable(); }; } #endregion #endregion } }