SimpleAsyncSubject.cs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Subjects
  8. {
  9. public abstract class SimpleAsyncSubject<T> : IAsyncSubject<T>
  10. {
  11. private readonly object _gate = new();
  12. private readonly List<IAsyncObserver<T>> _observers = new();
  13. private bool _done;
  14. private Exception _error;
  15. public ValueTask OnCompletedAsync()
  16. {
  17. IAsyncObserver<T>[] observers;
  18. lock (_gate)
  19. {
  20. if (_done || _error != null)
  21. {
  22. return default;
  23. }
  24. _done = true;
  25. observers = _observers.ToArray();
  26. }
  27. return OnCompletedAsyncCore(observers);
  28. }
  29. protected abstract ValueTask OnCompletedAsyncCore(IEnumerable<IAsyncObserver<T>> observers);
  30. public ValueTask OnErrorAsync(Exception error)
  31. {
  32. if (error == null)
  33. throw new ArgumentNullException(nameof(error));
  34. IAsyncObserver<T>[] observers;
  35. lock (_gate)
  36. {
  37. if (_done || _error != null)
  38. {
  39. return default;
  40. }
  41. _error = error;
  42. observers = _observers.ToArray();
  43. }
  44. return OnErrorAsyncCore(observers, error);
  45. }
  46. protected abstract ValueTask OnErrorAsyncCore(IEnumerable<IAsyncObserver<T>> observers, Exception error);
  47. public ValueTask OnNextAsync(T value)
  48. {
  49. IAsyncObserver<T>[] observers;
  50. lock (_gate)
  51. {
  52. if (_done || _error != null)
  53. {
  54. return default;
  55. }
  56. observers = _observers.ToArray();
  57. }
  58. return OnNextAsyncCore(observers, value);
  59. }
  60. protected abstract ValueTask OnNextAsyncCore(IEnumerable<IAsyncObserver<T>> observers, T value);
  61. public async ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer)
  62. {
  63. if (observer == null)
  64. throw new ArgumentNullException(nameof(observer));
  65. bool done;
  66. Exception error;
  67. lock (_gate)
  68. {
  69. done = _done;
  70. error = _error;
  71. if (!done && error == null)
  72. {
  73. _observers.Add(observer);
  74. }
  75. }
  76. if (done)
  77. {
  78. await observer.OnCompletedAsync().ConfigureAwait(false);
  79. return AsyncDisposable.Nop;
  80. }
  81. else if (error != null)
  82. {
  83. await observer.OnErrorAsync(error).ConfigureAwait(false);
  84. return AsyncDisposable.Nop;
  85. }
  86. else
  87. {
  88. return AsyncDisposable.Create(() =>
  89. {
  90. lock (_gate)
  91. {
  92. var i = _observers.LastIndexOf(observer);
  93. if (i >= 0)
  94. {
  95. _observers.RemoveAt(i);
  96. }
  97. }
  98. return default;
  99. });
  100. }
  101. }
  102. }
  103. }