123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- <#@ template debug="false" hostspecific="false" language="C#" #>
- <#@ assembly name="System.Core" #>
- <#@ import namespace="System.Linq" #>
- <#@ import namespace="System.Text" #>
- <#@ import namespace="System.Collections.Generic" #>
- <#@ output extension=".cs" #>
- using System.Reactive.Concurrency;
- using System.Reactive.Subjects;
- namespace System.Reactive.Linq
- {
- // REVIEW: Consider if these are worth retaining in the async space.
- partial class AsyncObservable
- {
- <#
- for (var i = 0; i <= 16; i++)
- {
- var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<TResult>" });
- var ret = "Func<" + string.Join(", ", args) + ">";
- var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
- var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
- var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">";
- #>
- public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function)
- {
- if (function == null)
- throw new ArgumentNullException(nameof(function));
- return ToAsync(function, TaskPoolAsyncScheduler.Default);
- }
- public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function, IAsyncScheduler scheduler)
- {
- if (function == null)
- throw new ArgumentNullException(nameof(function));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- return (<#=pars#>) =>
- {
- var subject = new SequentialAsyncAsyncSubject<TResult>();
- // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
- scheduler.ScheduleAsync(async ct =>
- {
- TResult res;
- try
- {
- res = function(<#=pars#>);
- }
- catch (Exception ex)
- {
- await subject.OnErrorAsync(ex).RendezVous(scheduler);
- return;
- }
- await subject.OnNextAsync(res).RendezVous(scheduler);
- await subject.OnCompletedAsync().RendezVous(scheduler);
- });
- return subject.AsAsyncObservable();
- };
- }
- <#
- }
- #>
- <#
- for (var i = 0; i <= 16; i++)
- {
- var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<Unit>" });
- var ret = "Func<" + string.Join(", ", args) + ">";
- var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
- var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
- var type = "Action";
- if (genArgs != "")
- {
- genArgs = "<" + genArgs + ">";
- type += genArgs;
- }
- #>
- public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action)
- {
- if (action == null)
- throw new ArgumentNullException(nameof(action));
- return ToAsync(action, TaskPoolAsyncScheduler.Default);
- }
- public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action, IAsyncScheduler scheduler)
- {
- if (action == null)
- throw new ArgumentNullException(nameof(action));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- return (<#=pars#>) =>
- {
- var subject = new SequentialAsyncAsyncSubject<Unit>();
- // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
- scheduler.ScheduleAsync(async ct =>
- {
- try
- {
- action(<#=pars#>);
- }
- catch (Exception ex)
- {
- await subject.OnErrorAsync(ex).RendezVous(scheduler);
- return;
- }
- await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
- await subject.OnCompletedAsync().RendezVous(scheduler);
- });
- return subject.AsAsyncObservable();
- };
- }
- <#
- }
- #>
- }
- }
|