DelaySubscription.cs 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq
  7. {
  8. public partial class AsyncObservable
  9. {
  10. public static IAsyncObservable<TSource> DelaySubscription<TSource>(this IAsyncObservable<TSource> source, TimeSpan dueTime)
  11. {
  12. if (source == null)
  13. throw new ArgumentNullException(nameof(source));
  14. return DelaySubscription(source, dueTime, TaskPoolAsyncScheduler.Default);
  15. }
  16. public static IAsyncObservable<TSource> DelaySubscription<TSource>(this IAsyncObservable<TSource> source, TimeSpan dueTime, IAsyncScheduler scheduler)
  17. {
  18. if (source == null)
  19. throw new ArgumentNullException(nameof(source));
  20. if (scheduler == null)
  21. throw new ArgumentNullException(nameof(scheduler));
  22. return Create(
  23. source,
  24. (dueTime, scheduler),
  25. static async (source, state, observer) =>
  26. {
  27. var d = new CompositeAsyncDisposable();
  28. var task = await state.scheduler.ScheduleAsync(async ct =>
  29. {
  30. var inner = await source.SubscribeSafeAsync(observer).ConfigureAwait(false);
  31. await d.AddAsync(inner).ConfigureAwait(false);
  32. }, state.dueTime).ConfigureAwait(false);
  33. await d.AddAsync(task).ConfigureAwait(false);
  34. return d;
  35. });
  36. }
  37. public static IAsyncObservable<TSource> DelaySubscription<TSource>(this IAsyncObservable<TSource> source, DateTimeOffset dueTime)
  38. {
  39. if (source == null)
  40. throw new ArgumentNullException(nameof(source));
  41. return DelaySubscription(source, dueTime, TaskPoolAsyncScheduler.Default);
  42. }
  43. public static IAsyncObservable<TSource> DelaySubscription<TSource>(this IAsyncObservable<TSource> source, DateTimeOffset dueTime, IAsyncScheduler scheduler)
  44. {
  45. if (source == null)
  46. throw new ArgumentNullException(nameof(source));
  47. if (scheduler == null)
  48. throw new ArgumentNullException(nameof(scheduler));
  49. return Create(
  50. source,
  51. (dueTime, scheduler),
  52. static async (source, state, observer) =>
  53. {
  54. var d = new CompositeAsyncDisposable();
  55. var task = await state.scheduler.ScheduleAsync(async ct =>
  56. {
  57. var inner = await source.SubscribeSafeAsync(observer).ConfigureAwait(false);
  58. await d.AddAsync(inner).ConfigureAwait(false);
  59. }, state.dueTime).ConfigureAwait(false);
  60. await d.AddAsync(task).ConfigureAwait(false);
  61. return d;
  62. });
  63. }
  64. }
  65. }