ScheduledAsyncObserver.cs 1.3 KB

123456789101112131415161718192021222324252627282930313233343536
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Reactive
  9. {
  10. internal sealed class ScheduledAsyncObserver<T> : ScheduledAsyncObserverBase<T>
  11. {
  12. private readonly IAsyncScheduler _scheduler;
  13. private readonly SerialAsyncDisposable _disposable = new SerialAsyncDisposable();
  14. public ScheduledAsyncObserver(IAsyncObserver<T> observer, IAsyncScheduler scheduler)
  15. : base(observer)
  16. {
  17. _scheduler = scheduler;
  18. }
  19. public override Task DisposeAsync() => _disposable.DisposeAsync();
  20. protected override IAwaitable RendezVous(Task task) => new TaskAwaitable(task, false, _scheduler, CancellationToken.None);
  21. protected override IAwaitable<R> RendezVous<R>(Task<R> task) => new TaskAwaitable<R>(task, false, _scheduler, CancellationToken.None);
  22. protected override async Task ScheduleAsync()
  23. {
  24. var d = await _scheduler.ScheduleAsync(RunAsync).ConfigureAwait(false);
  25. await _disposable.AssignAsync(d).ConfigureAwait(false);
  26. }
  27. }
  28. }