ObserveOn.cs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Reactive.Linq
  10. {
  11. partial class AsyncObservable
  12. {
  13. public static IAsyncObservable<TSource> ObserveOn<TSource>(this IAsyncObservable<TSource> source, IAsyncScheduler scheduler)
  14. {
  15. if (source == null)
  16. throw new ArgumentNullException(nameof(source));
  17. if (scheduler == null)
  18. throw new ArgumentNullException(nameof(scheduler));
  19. return Create<TSource>(async observer =>
  20. {
  21. var (sink, drain) = await AsyncObserver.ObserveOn(observer, scheduler).ConfigureAwait(false);
  22. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  23. return StableCompositeAsyncDisposable.Create(subscription, drain);
  24. });
  25. }
  26. }
  27. partial class AsyncObserver
  28. {
  29. public static async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> ObserveOn<TSource>(this IAsyncObserver<TSource> observer, IAsyncScheduler scheduler)
  30. {
  31. if (observer == null)
  32. throw new ArgumentNullException(nameof(observer));
  33. if (scheduler == null)
  34. throw new ArgumentNullException(nameof(scheduler));
  35. var semaphore = new SemaphoreSlim(0);
  36. var gate = new AsyncLock();
  37. var queue = new Queue<TSource>();
  38. var error = default(Exception);
  39. var isDone = false;
  40. var drain = await scheduler.ScheduleAsync(async ct =>
  41. {
  42. while (!ct.IsCancellationRequested)
  43. {
  44. await semaphore.WaitAsync(ct).RendezVous(scheduler, ct);
  45. if (queue.Count > 0)
  46. {
  47. var next = queue.Dequeue();
  48. await observer.OnNextAsync(next).RendezVous(scheduler, ct);
  49. }
  50. if (queue.Count == 0)
  51. {
  52. if (isDone)
  53. {
  54. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  55. break;
  56. }
  57. if (error != null)
  58. {
  59. await observer.OnErrorAsync(error).RendezVous(scheduler, ct);
  60. break;
  61. }
  62. }
  63. }
  64. }).ConfigureAwait(false);
  65. var sink = Create<TSource>(
  66. async x =>
  67. {
  68. using (await gate.LockAsync().ConfigureAwait(false))
  69. {
  70. queue.Enqueue(x);
  71. }
  72. semaphore.Release(1);
  73. },
  74. async ex =>
  75. {
  76. using (await gate.LockAsync().ConfigureAwait(false))
  77. {
  78. error = ex;
  79. }
  80. semaphore.Release(1);
  81. },
  82. async () =>
  83. {
  84. using (await gate.LockAsync().ConfigureAwait(false))
  85. {
  86. isDone = true;
  87. }
  88. semaphore.Release(1);
  89. }
  90. );
  91. return (sink, drain);
  92. }
  93. }
  94. }