AsyncJoinObserver.cs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Linq;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Reactive.Joins
  10. {
  11. internal sealed class AsyncJoinObserver<T> : AsyncObserverBase<Notification<T>>, IAsyncJoinObserver
  12. {
  13. private readonly IAsyncObservable<T> _source;
  14. private readonly Func<Exception, ValueTask> _onError;
  15. private readonly List<ActiveAsyncPlan> _activePlans = new List<ActiveAsyncPlan>();
  16. private readonly SingleAssignmentAsyncDisposable _subscription = new SingleAssignmentAsyncDisposable();
  17. private AsyncGate _gate;
  18. private bool _isDisposed;
  19. public AsyncJoinObserver(IAsyncObservable<T> source, Func<Exception, ValueTask> onError)
  20. {
  21. _source = source;
  22. _onError = onError;
  23. }
  24. public Queue<Notification<T>> Queue { get; } = new Queue<Notification<T>>();
  25. public void Dequeue() => Queue.Dequeue();
  26. public void AddActivePlan(ActiveAsyncPlan activePlan)
  27. {
  28. _activePlans.Add(activePlan);
  29. }
  30. internal async Task RemoveActivePlan(ActiveAsyncPlan activePlan)
  31. {
  32. _activePlans.Remove(activePlan);
  33. if (_activePlans.Count == 0)
  34. {
  35. await DisposeAsync().ConfigureAwait(false);
  36. }
  37. }
  38. public async ValueTask DisposeAsync()
  39. {
  40. if (!_isDisposed)
  41. {
  42. await _subscription.DisposeAsync().ConfigureAwait(false);
  43. _isDisposed = true;
  44. }
  45. }
  46. public async Task SubscribeAsync(AsyncGate gate)
  47. {
  48. _gate = gate;
  49. var d = await _source.Materialize().SubscribeSafeAsync(this).ConfigureAwait(false);
  50. await _subscription.AssignAsync(d).ConfigureAwait(false);
  51. }
  52. protected override ValueTask OnCompletedAsyncCore() => default;
  53. protected override ValueTask OnErrorAsyncCore(Exception error) => default;
  54. protected override async ValueTask OnNextAsyncCore(Notification<T> notification)
  55. {
  56. using (await _gate.LockAsync().ConfigureAwait(false))
  57. {
  58. if (!_isDisposed)
  59. {
  60. if (notification.Kind == NotificationKind.OnError)
  61. {
  62. await _onError(notification.Exception).ConfigureAwait(false);
  63. }
  64. else
  65. {
  66. Queue.Enqueue(notification);
  67. var plans = _activePlans.ToArray();
  68. for (var i = 0; i < plans.Length; i++)
  69. {
  70. await plans[i].Match().ConfigureAwait(false); // REVIEW: Consider concurrent matching.
  71. }
  72. }
  73. }
  74. }
  75. }
  76. }
  77. }