// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Reactive.Subjects; using System.Threading.Tasks; namespace System.Reactive.Linq { // REVIEW: Expose Replay using ConcurrentAsyncAsyncSubject underneath. partial class AsyncObservable { public static IConnectableAsyncObservable Replay(this IAsyncObservable source) { if (source == null) throw new ArgumentNullException(nameof(source)); return Multicast(source, new SequentialReplayAsyncSubject()); } public static IConnectableAsyncObservable Replay(this IAsyncObservable source, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, new SequentialReplayAsyncSubject(scheduler)); } public static IConnectableAsyncObservable Replay(this IAsyncObservable source, int bufferSize) { if (source == null) throw new ArgumentNullException(nameof(source)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); return Multicast(source, new SequentialReplayAsyncSubject(bufferSize)); } public static IConnectableAsyncObservable Replay(this IAsyncObservable source, int bufferSize, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, new SequentialReplayAsyncSubject(bufferSize, scheduler)); } public static IConnectableAsyncObservable Replay(this IAsyncObservable source, TimeSpan window) { if (source == null) throw new ArgumentNullException(nameof(source)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); return Multicast(source, new SequentialReplayAsyncSubject(window)); } public static IConnectableAsyncObservable Replay(this IAsyncObservable source, TimeSpan window, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, new SequentialReplayAsyncSubject(window, scheduler)); } public static IConnectableAsyncObservable Replay(this IAsyncObservable source, int bufferSize, TimeSpan window) { if (source == null) throw new ArgumentNullException(nameof(source)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); return Multicast(source, new SequentialReplayAsyncSubject(bufferSize, window)); } public static IConnectableAsyncObservable Replay(this IAsyncObservable source, int bufferSize, TimeSpan window, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, new SequentialReplayAsyncSubject(bufferSize, window, scheduler)); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, IAsyncObservable> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Multicast(source, () => new SequentialReplayAsyncSubject(), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, IAsyncObservable> selector, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, () => new SequentialReplayAsyncSubject(scheduler), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, IAsyncObservable> selector, int bufferSize) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); return Multicast(source, () => new SequentialReplayAsyncSubject(bufferSize), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, IAsyncObservable> selector, int bufferSize, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, () => new SequentialReplayAsyncSubject(bufferSize, scheduler), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, IAsyncObservable> selector, TimeSpan window) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); return Multicast(source, () => new SequentialReplayAsyncSubject(window), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, IAsyncObservable> selector, TimeSpan window, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, () => new SequentialReplayAsyncSubject(window, scheduler), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, IAsyncObservable> selector, int bufferSize, TimeSpan window) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); return Multicast(source, () => new SequentialReplayAsyncSubject(bufferSize, window), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, IAsyncObservable> selector, int bufferSize, TimeSpan window, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, () => new SequentialReplayAsyncSubject(bufferSize, window, scheduler), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, Task>> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Multicast(source, () => Task.FromResult>(new SequentialReplayAsyncSubject()), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, Task>> selector, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, () => Task.FromResult>(new SequentialReplayAsyncSubject(scheduler)), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, Task>> selector, int bufferSize) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); return Multicast(source, () => Task.FromResult>(new SequentialReplayAsyncSubject(bufferSize)), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, Task>> selector, int bufferSize, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, () => Task.FromResult>(new SequentialReplayAsyncSubject(bufferSize, scheduler)), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, Task>> selector, TimeSpan window) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); return Multicast(source, () => Task.FromResult>(new SequentialReplayAsyncSubject(window)), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, Task>> selector, TimeSpan window, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, () => Task.FromResult>(new SequentialReplayAsyncSubject(window, scheduler)), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, Task>> selector, int bufferSize, TimeSpan window) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); return Multicast(source, () => Task.FromResult>(new SequentialReplayAsyncSubject(bufferSize, window)), selector); } public static IAsyncObservable Replay(this IAsyncObservable source, Func, Task>> selector, int bufferSize, TimeSpan window, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (bufferSize < 0) throw new ArgumentOutOfRangeException(nameof(bufferSize)); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(window)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Multicast(source, () => Task.FromResult>(new SequentialReplayAsyncSubject(bufferSize, window, scheduler)), selector); } } }