AsyncObservableBase.cs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading.Tasks;
  5. namespace System.Reactive
  6. {
  7. public abstract class AsyncObservableBase<T> : IAsyncObservable<T>
  8. {
  9. public async ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer)
  10. {
  11. if (observer == null)
  12. throw new ArgumentNullException(nameof(observer));
  13. var autoDetach = new AutoDetachAsyncObserver(observer);
  14. var subscription = await SubscribeAsyncCore(autoDetach).ConfigureAwait(false);
  15. await autoDetach.AssignAsync(subscription).ConfigureAwait(false);
  16. return autoDetach;
  17. }
  18. protected abstract ValueTask<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<T> observer);
  19. private sealed class AutoDetachAsyncObserver : AsyncObserverBase<T>, IAsyncDisposable
  20. {
  21. private readonly IAsyncObserver<T> _observer;
  22. private readonly object _gate = new object();
  23. private IAsyncDisposable _subscription;
  24. private ValueTask _task;
  25. private bool _disposing;
  26. public AutoDetachAsyncObserver(IAsyncObserver<T> observer)
  27. {
  28. _observer = observer;
  29. }
  30. public async ValueTask AssignAsync(IAsyncDisposable subscription)
  31. {
  32. var shouldDispose = false;
  33. lock (_gate)
  34. {
  35. if (_disposing)
  36. {
  37. shouldDispose = true;
  38. }
  39. else
  40. {
  41. _subscription = subscription;
  42. }
  43. }
  44. if (shouldDispose)
  45. {
  46. await subscription.DisposeAsync().ConfigureAwait(false);
  47. }
  48. }
  49. public async ValueTask DisposeAsync()
  50. {
  51. var task = default(ValueTask);
  52. var subscription = default(IAsyncDisposable);
  53. lock (_gate)
  54. {
  55. //
  56. // NB: The postcondition of awaiting the first DisposeAsync call to complete is that all message
  57. // processing has ceased, i.e. no further On*AsyncCore calls will be made. This is achieved
  58. // here by setting _disposing to true, which is checked by the On*AsyncCore calls upon
  59. // entry, and by awaiting the task of any in-flight On*AsyncCore calls.
  60. //
  61. // Timing of the disposal of the subscription is less deterministic due to the intersection
  62. // with the AssignAsync code path. However, the auto-detach observer can only be returned
  63. // from the SubscribeAsync call *after* a call to AssignAsync has been made and awaited, so
  64. // either AssignAsync triggers the disposal and an already disposed instance is returned, or
  65. // the user calling DisposeAsync will either encounter a busy observer which will be stopped
  66. // in its tracks (as described above) or it will trigger a disposal of the subscription. In
  67. // both these cases the result of awaiting DisposeAsync guarantees no further message flow.
  68. //
  69. if (!_disposing)
  70. {
  71. _disposing = true;
  72. task = _task;
  73. subscription = _subscription;
  74. }
  75. }
  76. try
  77. {
  78. //
  79. // BUGBUG: This causes grief when an outgoing On*Async call reenters the DisposeAsync method and
  80. // results in the task returned from the On*Async call to be awaited to serialize the
  81. // call to subscription.DisposeAsync after it's done. We need to either detect reentrancy
  82. // and queue up the call to DisposeAsync or follow an when we trigger the disposal without
  83. // awaiting outstanding work (thus allowing for concurrency).
  84. //
  85. // if (task != null)
  86. // {
  87. // await task.ConfigureAwait(false);
  88. // }
  89. //
  90. }
  91. finally
  92. {
  93. if (subscription != null)
  94. {
  95. await subscription.DisposeAsync().ConfigureAwait(false);
  96. }
  97. }
  98. }
  99. protected override async ValueTask OnCompletedAsyncCore()
  100. {
  101. lock (_gate)
  102. {
  103. if (_disposing)
  104. {
  105. return;
  106. }
  107. _task = _observer.OnCompletedAsync();
  108. }
  109. try
  110. {
  111. await _task.ConfigureAwait(false);
  112. }
  113. finally
  114. {
  115. await FinishAsync().ConfigureAwait(false);
  116. }
  117. }
  118. protected override async ValueTask OnErrorAsyncCore(Exception error)
  119. {
  120. lock (_gate)
  121. {
  122. if (_disposing)
  123. {
  124. return;
  125. }
  126. _task = _observer.OnErrorAsync(error);
  127. }
  128. try
  129. {
  130. await _task.ConfigureAwait(false);
  131. }
  132. finally
  133. {
  134. await FinishAsync().ConfigureAwait(false);
  135. }
  136. }
  137. protected override async ValueTask OnNextAsyncCore(T value)
  138. {
  139. lock (_gate)
  140. {
  141. if (_disposing)
  142. {
  143. return;
  144. }
  145. _task = _observer.OnNextAsync(value);
  146. }
  147. try
  148. {
  149. await _task.ConfigureAwait(false);
  150. }
  151. finally
  152. {
  153. lock (_gate)
  154. {
  155. _task = default;
  156. }
  157. }
  158. }
  159. private async ValueTask FinishAsync()
  160. {
  161. var subscription = default(IAsyncDisposable);
  162. lock (_gate)
  163. {
  164. if (!_disposing)
  165. {
  166. _disposing = true;
  167. subscription = _subscription;
  168. }
  169. _task = default;
  170. }
  171. if (subscription != null)
  172. {
  173. await subscription.DisposeAsync().ConfigureAwait(false);
  174. }
  175. }
  176. }
  177. }
  178. }