// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Threading.Tasks; namespace System.Reactive.Linq { public partial class AsyncObservable { public static IAsyncObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector)); } public static IAsyncObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, IAsyncScheduler scheduler) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, scheduler)); } public static IAsyncObservable Generate(TState initialState, Func> condition, Func> iterate, Func> resultSelector) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector)); } public static IAsyncObservable Generate(TState initialState, Func> condition, Func> iterate, Func> resultSelector, IAsyncScheduler scheduler) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, scheduler)); } public static IAsyncObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, timeSelector)); } public static IAsyncObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector, IAsyncScheduler scheduler) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, timeSelector, scheduler)); } public static IAsyncObservable Generate(TState initialState, Func> condition, Func> iterate, Func> resultSelector, Func> timeSelector) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, timeSelector)); } public static IAsyncObservable Generate(TState initialState, Func> condition, Func> iterate, Func> resultSelector, Func> timeSelector, IAsyncScheduler scheduler) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, timeSelector, scheduler)); } public static IAsyncObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, timeSelector)); } public static IAsyncObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector, IAsyncScheduler scheduler) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, timeSelector, scheduler)); } public static IAsyncObservable Generate(TState initialState, Func> condition, Func> iterate, Func> resultSelector, Func> timeSelector) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, timeSelector)); } public static IAsyncObservable Generate(TState initialState, Func> condition, Func> iterate, Func> resultSelector, Func> timeSelector, IAsyncScheduler scheduler) { if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create(observer => AsyncObserver.Generate(observer, initialState, condition, iterate, resultSelector, timeSelector, scheduler)); } } public partial class AsyncObserver { public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func condition, Func iterate, Func resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return Generate(observer, initialState, s => new ValueTask(condition(s)), s => new ValueTask(iterate(s)), s => new ValueTask(resultSelector(s)), TaskPoolAsyncScheduler.Default); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func condition, Func iterate, Func resultSelector, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Generate(observer, initialState, s => new ValueTask(condition(s)), s => new ValueTask(iterate(s)), s => new ValueTask(resultSelector(s)), scheduler); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func> condition, Func> iterate, Func> resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return Generate(observer, initialState, condition, iterate, resultSelector, TaskPoolAsyncScheduler.Default); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func> condition, Func> iterate, Func> resultSelector, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return scheduler.ScheduleAsync(async ct => { var first = true; var state = initialState; while (!ct.IsCancellationRequested) { var hasResult = false; var result = default(TResult); try { if (first) { first = false; } else { state = await iterate(state).RendezVous(scheduler, ct); } hasResult = await condition(state).RendezVous(scheduler, ct); if (hasResult) { result = await resultSelector(state).RendezVous(scheduler, ct); } } catch (Exception ex) { await observer.OnErrorAsync(ex).RendezVous(scheduler, ct); return; } if (hasResult) { await observer.OnNextAsync(result).RendezVous(scheduler, ct); } else { break; } } if (!ct.IsCancellationRequested) { await observer.OnCompletedAsync().RendezVous(scheduler, ct); } }); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); return Generate(observer, initialState, s => new ValueTask(condition(s)), s => new ValueTask(iterate(s)), s => new ValueTask(resultSelector(s)), s => new ValueTask(timeSelector(s)), TaskPoolAsyncScheduler.Default); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Generate(observer, initialState, s => new ValueTask(condition(s)), s => new ValueTask(iterate(s)), s => new ValueTask(resultSelector(s)), s => new ValueTask(timeSelector(s)), scheduler); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func> condition, Func> iterate, Func> resultSelector, Func> timeSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); return Generate(observer, initialState, condition, iterate, resultSelector, timeSelector, TaskPoolAsyncScheduler.Default); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func> condition, Func> iterate, Func> resultSelector, Func> timeSelector, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return scheduler.ScheduleAsync(async ct => { var first = true; var state = initialState; while (!ct.IsCancellationRequested) { var hasResult = false; var result = default(TResult); var nextDue = default(TimeSpan); try { if (first) { first = false; } else { state = await iterate(state).RendezVous(scheduler, ct); } hasResult = await condition(state).RendezVous(scheduler, ct); if (hasResult) { result = await resultSelector(state).RendezVous(scheduler, ct); nextDue = await timeSelector(state).RendezVous(scheduler, ct); } } catch (Exception ex) { await observer.OnErrorAsync(ex).RendezVous(scheduler, ct); return; } if (hasResult) { await observer.OnNextAsync(result).RendezVous(scheduler, ct); } else { break; } await scheduler.Delay(nextDue).RendezVous(scheduler, ct); } if (!ct.IsCancellationRequested) { await observer.OnCompletedAsync().RendezVous(scheduler, ct); } }); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); return Generate(observer, initialState, s => new ValueTask(condition(s)), s => new ValueTask(iterate(s)), s => new ValueTask(resultSelector(s)), s => new ValueTask(timeSelector(s)), TaskPoolAsyncScheduler.Default); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Generate(observer, initialState, s => new ValueTask(condition(s)), s => new ValueTask(iterate(s)), s => new ValueTask(resultSelector(s)), s => new ValueTask(timeSelector(s)), scheduler); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func> condition, Func> iterate, Func> resultSelector, Func> timeSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); return Generate(observer, initialState, condition, iterate, resultSelector, timeSelector, TaskPoolAsyncScheduler.Default); } public static ValueTask Generate(IAsyncObserver observer, TState initialState, Func> condition, Func> iterate, Func> resultSelector, Func> timeSelector, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (condition == null) throw new ArgumentNullException(nameof(condition)); if (iterate == null) throw new ArgumentNullException(nameof(iterate)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return scheduler.ScheduleAsync(async ct => { var first = true; var state = initialState; while (!ct.IsCancellationRequested) { var hasResult = false; var result = default(TResult); var nextDue = default(DateTimeOffset); try { if (first) { first = false; } else { state = await iterate(state).RendezVous(scheduler, ct); } hasResult = await condition(state).RendezVous(scheduler, ct); if (hasResult) { result = await resultSelector(state).RendezVous(scheduler, ct); nextDue = await timeSelector(state).RendezVous(scheduler, ct); } } catch (Exception ex) { await observer.OnErrorAsync(ex).RendezVous(scheduler, ct); return; } if (hasResult) { await observer.OnNextAsync(result).RendezVous(scheduler, ct); } else { break; } await scheduler.Delay(nextDue).RendezVous(scheduler, ct); } if (!ct.IsCancellationRequested) { await observer.OnCompletedAsync().RendezVous(scheduler, ct); } }); } } }