// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq { partial class AsyncObservable { public static IAsyncObservable Throttle(this IAsyncObservable source, TimeSpan dueTime) { if (source == null) throw new ArgumentNullException(nameof(source)); if (dueTime < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(dueTime)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (sink, throttler) = AsyncObserver.Throttle(observer, dueTime); await d.AddAsync(throttler).ConfigureAwait(false); var sourceSubscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); await d.AddAsync(sourceSubscription).ConfigureAwait(false); return d; }); } public static IAsyncObservable Throttle(this IAsyncObservable source, TimeSpan dueTime, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (dueTime < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(dueTime)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (sink, throttler) = AsyncObserver.Throttle(observer, dueTime, scheduler); await d.AddAsync(throttler).ConfigureAwait(false); var sourceSubscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); await d.AddAsync(sourceSubscription).ConfigureAwait(false); return d; }); } public static IAsyncObservable Throttle(this IAsyncObservable source, Func> throttleSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (throttleSelector == null) throw new ArgumentNullException(nameof(throttleSelector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (sink, throttler) = AsyncObserver.Throttle(observer, throttleSelector); await d.AddAsync(throttler).ConfigureAwait(false); var sourceSubscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); await d.AddAsync(sourceSubscription).ConfigureAwait(false); return d; }); } } partial class AsyncObserver { public static (IAsyncObserver, IAsyncDisposable) Throttle(IAsyncObserver observer, TimeSpan dueTime) => Throttle(observer, dueTime, TaskPoolAsyncScheduler.Default); public static (IAsyncObserver, IAsyncDisposable) Throttle(IAsyncObserver observer, TimeSpan dueTime, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (dueTime < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(dueTime)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); var gate = new AsyncLock(); var timer = new SerialAsyncDisposable(); var hasValue = false; var value = default(TSource); var id = 0UL; return ( Create( async x => { var myId = default(ulong); using (await gate.LockAsync().ConfigureAwait(false)) { hasValue = true; value = x; myId = ++id; } var d = new SingleAssignmentAsyncDisposable(); await timer.AssignAsync(d).ConfigureAwait(false); var t = await scheduler.ScheduleAsync(async ct => { if (!ct.IsCancellationRequested) { using (await gate.LockAsync().ConfigureAwait(false)) { if (hasValue && id == myId) { await observer.OnNextAsync(value).ConfigureAwait(false); } hasValue = false; } } }, dueTime).ConfigureAwait(false); await d.AssignAsync(t).ConfigureAwait(false); }, async ex => { await timer.DisposeAsync().ConfigureAwait(false); using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); hasValue = false; id++; } }, async () => { await timer.DisposeAsync().ConfigureAwait(false); using (await gate.LockAsync().ConfigureAwait(false)) { if (hasValue) { await observer.OnNextAsync(value).ConfigureAwait(false); } await observer.OnCompletedAsync().ConfigureAwait(false); hasValue = false; id++; } } ), timer ); } public static (IAsyncObserver, IAsyncDisposable) Throttle(IAsyncObserver observer, Func> throttleSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (throttleSelector == null) throw new ArgumentNullException(nameof(throttleSelector)); var gate = new AsyncLock(); var throttler = new SerialAsyncDisposable(); var hasValue = false; var value = default(TSource); var id = 0UL; return ( Create( async x => { var throttleSource = default(IAsyncObservable); try { throttleSource = throttleSelector(x); // REVIEW: Do we need an async variant? } catch (Exception ex) { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } return; } var myId = default(ulong); using (await gate.LockAsync().ConfigureAwait(false)) { hasValue = true; value = x; myId = ++id; } var d = new SingleAssignmentAsyncDisposable(); await throttler.AssignAsync(d).ConfigureAwait(false); var throttleObserver = Create( async y => { using (await gate.LockAsync().ConfigureAwait(false)) { if (hasValue && myId == id) { await observer.OnNextAsync(x).ConfigureAwait(false); } hasValue = false; await d.DisposeAsync().ConfigureAwait(false); } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { if (hasValue && myId == id) { await observer.OnNextAsync(x).ConfigureAwait(false); } hasValue = false; await d.DisposeAsync().ConfigureAwait(false); } } ); var t = await throttleSource.SubscribeSafeAsync(throttleObserver).ConfigureAwait(false); await d.AssignAsync(t).ConfigureAwait(false); }, async ex => { await throttler.DisposeAsync().ConfigureAwait(false); using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); hasValue = false; id++; } }, async () => { await throttler.DisposeAsync().ConfigureAwait(false); using (await gate.LockAsync().ConfigureAwait(false)) { if (hasValue) { await observer.OnNextAsync(value).ConfigureAwait(false); } await observer.OnCompletedAsync().ConfigureAwait(false); hasValue = false; id++; } } ), throttler ); } } }