RefCount.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Diagnostics;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. using System.Threading;
  9. namespace System.Reactive.Linq.ObservableImpl
  10. {
  11. internal static class RefCount<TSource>
  12. {
  13. internal sealed class Eager : Producer<TSource, Eager._>
  14. {
  15. private readonly IConnectableObservable<TSource> _source;
  16. private readonly object _gate = new();
  17. /// <summary>
  18. /// Contains the current active connection's state or null
  19. /// if no connection is active at the moment.
  20. /// Should be manipulated while holding the <see cref="_gate"/> lock.
  21. /// </summary>
  22. private RefConnection? _connection;
  23. private readonly int _minObservers;
  24. public Eager(IConnectableObservable<TSource> source, int minObservers)
  25. {
  26. _source = source;
  27. _minObservers = minObservers;
  28. }
  29. protected override _ CreateSink(IObserver<TSource> observer) => new(observer, this);
  30. protected override void Run(_ sink) => sink.Run();
  31. internal sealed class _ : IdentitySink<TSource>
  32. {
  33. private readonly Eager _parent;
  34. /// <summary>
  35. /// Contains the connection reference the downstream observer
  36. /// has subscribed to. Its purpose is to
  37. /// avoid subscribing, connecting and disconnecting
  38. /// while holding a lock.
  39. /// </summary>
  40. private RefConnection? _targetConnection;
  41. public _(IObserver<TSource> observer, Eager parent)
  42. : base(observer)
  43. {
  44. _parent = parent;
  45. }
  46. public void Run()
  47. {
  48. bool doConnect;
  49. RefConnection? conn;
  50. lock (_parent._gate)
  51. {
  52. // get the active connection state
  53. conn = _parent._connection;
  54. // if null, a new connection should be established
  55. if (conn == null)
  56. {
  57. conn = new RefConnection();
  58. // make it the active one
  59. _parent._connection = conn;
  60. }
  61. // if this is the first time the observer count has reached the minimum
  62. // observer count since we last had no observers, then connect
  63. doConnect = ++conn._count == _parent._minObservers && conn._disposable.Disposable is null;
  64. // save the current connection for this observer
  65. _targetConnection = conn;
  66. }
  67. // subscribe to the source first
  68. Run(_parent._source);
  69. // then connect the source if necessary
  70. if (doConnect && !conn._disposable.IsDisposed)
  71. {
  72. // this makes sure if the connection ends synchronously
  73. // only the currently known connection is affected
  74. // and a connection from a concurrent reconnection won't
  75. // interfere
  76. conn._disposable.Disposable = _parent._source.Connect();
  77. }
  78. }
  79. protected override void Dispose(bool disposing)
  80. {
  81. base.Dispose(disposing);
  82. if (disposing)
  83. {
  84. // get and forget the saved connection
  85. var targetConnection = _targetConnection!; // NB: Always set by Run prior to calling Dispose, and base class hardens protects against double-dispose.
  86. _targetConnection = null;
  87. lock (_parent._gate)
  88. {
  89. // if the current connection is no longer the saved connection
  90. // or the counter hasn't reached zero yet
  91. if (targetConnection != _parent._connection
  92. || --targetConnection._count != 0)
  93. {
  94. // nothing to do.
  95. return;
  96. }
  97. // forget the current connection
  98. _parent._connection = null;
  99. }
  100. // disconnect
  101. targetConnection._disposable.Dispose();
  102. }
  103. }
  104. }
  105. /// <summary>
  106. /// Holds an individual connection state: the observer count and
  107. /// the connection's IDisposable.
  108. /// </summary>
  109. private sealed class RefConnection
  110. {
  111. internal int _count;
  112. internal SingleAssignmentDisposableValue _disposable;
  113. }
  114. }
  115. internal sealed class Lazy : Producer<TSource, Lazy._>
  116. {
  117. // This operator's state transitions are easily misunderstood, as bugs #2214 and #2215
  118. // testify. In particular, there are tricky cases around:
  119. //
  120. // * a transition to 0 subscribers followed by the arrival of a new subscriber before
  121. // the disconnect delay elapses
  122. // * sources that complete before returning from Connect
  123. // * sources that produce notifications without waiting for Connect (especially if
  124. // they call OnComplete before returning from Subscribe)
  125. //
  126. // This is further complicated by the need to handle multithreading. Although Rx
  127. // requires notifications to an individual observer to be sequential, there are two
  128. // reasons concurrency may occur here:
  129. //
  130. // * each subscription to RefCount causes a subscription to the underlying source.
  131. // (RefCount only aggregates the calls to Connect. In the common usage of the
  132. // form source.Publish().RefCount(), these subscriptions all go to a single
  133. // subject, so there won't be concurrent source notifications in practice. But
  134. // RefCount works with any IConnectableObservable<T>, and in general, connectable
  135. // observables are not required to synchronize notifications across all
  136. // subscribers for a particular connection. Since RefCount needs to detect when
  137. // sources complete by themselves, it needs to be able to handle concurrent
  138. // completions.)
  139. // * new Subscribe calls to RefCount could happen concurrently with notifications
  140. // emerging for existing subscriptions, or concurrently with other calls to
  141. // Subscribe. (Strictly speaking, we've never promised that the latter will work.
  142. // The documentation does not tell you that it's OK for multiple threads to make
  143. // concurrent calls to Subscribe on a single RefCount. However, historically we
  144. // have always guarded against such calls, so for backwards compatibility we must
  145. // continue to do so.)
  146. //
  147. // Each call to RefCount(disconnectDelay) creates a single instance of this Lazy class.
  148. // Then we get one instance of the nested Lazy._sink for each subscriber. The outer
  149. // Lazy class will be in one of the states described in the State enumeration.
  150. //
  151. // State transitions are tricky. Although we can use a lock to protect against
  152. // concurrency, re-entrancy causes problems: when we call Subscribe or Connect, those
  153. // can complete subscribers (either ones already set up, or the one we're trying to set
  154. // up when calling Subscribe or Connect). So even though we may hold a lock to protect
  155. // the operator's state, calling these methods can cause completion to occur, and the
  156. // completion logic may try to acquire the same lock. It will succeed (because
  157. // re-entrant lock acquisition is supported, since the alternative is deadlock or
  158. // failure) and so we end up with a block of code owning a lock and modifying data
  159. // protected by that lock right in the middle of the execution of another block of code
  160. // that also owns that same lock. That is exactly the situation we normally expect lock
  161. // to prevent, but it can't help us when re-entrancy occurs. So we typically want to
  162. // avoid any calls that could trigger such re-entrancy while updating shared state, but
  163. // that's not always possible, so in cases where we need to call out to user code while
  164. // holding the _gate lock, we need to remember that state might have changed during
  165. // that call.
  166. private readonly object _gate;
  167. private readonly IScheduler _scheduler;
  168. private readonly TimeSpan _disconnectTime;
  169. private readonly IConnectableObservable<TSource> _source;
  170. private readonly int _minObservers;
  171. private State _state;
  172. private IDisposable? _serial;
  173. private int _count;
  174. private IDisposable? _connectableSubscription;
  175. /// <summary>
  176. /// Represents the <see cref="Lazy"/> instance's state (shared across all subscriptions
  177. /// to that instance).
  178. /// </summary>
  179. private enum State
  180. {
  181. /// <summary>
  182. /// Disconnected with 0 subscribers. This is the initial state, and also the state
  183. /// we return to after the subscriber count drops to zero, and the disconnect delay
  184. /// elapses without further subscriptions being added).
  185. /// </summary>
  186. DisconnectedNoSubscribers,
  187. /// <summary>
  188. /// Disconnected with 0 &lt; subscribers &lt; minObservers. This is the state we
  189. /// enter when we get our first subscriber and minObservers is >= 2).
  190. /// </summary>
  191. DisconnectedWithSubscribers,
  192. /// <summary>
  193. /// Connected with at least one subscriber. We enter this state when the number of
  194. /// subscribers first reaches minObservers (or when it reached it again after
  195. /// disconnecting). If minObservers = 1, we enter this state from
  196. /// <see cref="DisconnectedNoSubscribers"/> as soon as we get a subscriber.
  197. /// </summary>
  198. ConnectedWithSubscribers,
  199. /// <summary>
  200. /// Connected with 0 subscribers, and waiting for the disconnect delay to elapse,
  201. /// or for new subscriptions come in before that delay completes.
  202. /// </summary>
  203. ConnectedWithNoSubscribers
  204. }
  205. public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers)
  206. {
  207. _source = source;
  208. _gate = new object();
  209. _disconnectTime = disconnectTime;
  210. _scheduler = scheduler;
  211. _minObservers = minObservers;
  212. _state = State.DisconnectedNoSubscribers;
  213. }
  214. protected override _ CreateSink(IObserver<TSource> observer) => new(observer);
  215. protected override void Run(_ sink) => sink.Run(this);
  216. internal sealed class _ : IdentitySink<TSource>
  217. {
  218. public _(IObserver<TSource> observer)
  219. : base(observer)
  220. {
  221. }
  222. public void Run(Lazy parent)
  223. {
  224. // The source might complete synchronously, so it's possible that we might be
  225. // in a Disposed state for the remainder of the method. This is expected, but
  226. // anyone planning to modify this code needs to bear that in mind.
  227. var subscription = parent._source.SubscribeSafe(this);
  228. lock (parent._gate)
  229. {
  230. var observerCount = ++parent._count;
  231. var shouldConnect = false;
  232. var shouldCancelDelayedDisconnect = false;
  233. switch (parent._state)
  234. {
  235. case State.DisconnectedNoSubscribers:
  236. case State.DisconnectedWithSubscribers:
  237. Debug.Assert(observerCount <= parent._minObservers, "RefCount should never exceed minObservers without already having connected");
  238. shouldConnect = observerCount == parent._minObservers;
  239. parent._state = shouldConnect ? State.ConnectedWithSubscribers : State.DisconnectedWithSubscribers;
  240. break;
  241. // If we're ConnectedWithSubscribers, we have no further work to do.
  242. case State.ConnectedWithNoSubscribers:
  243. shouldCancelDelayedDisconnect = true;
  244. parent._state = State.ConnectedWithSubscribers;
  245. break;
  246. }
  247. if (shouldConnect)
  248. {
  249. Debug.Assert(parent._connectableSubscription is null, "RefCount already connected when it should not be");
  250. parent._connectableSubscription = parent._source.Connect();
  251. // That call to Connect can cause the underlying source to complete. If
  252. // there were already subscribers (e.g., we have minObservers > 1, and
  253. // this latest subscription is the one that hit the minObserver
  254. // threshold), those would be completed inside this call, meaning that
  255. // the Disposable.Create callbacks they set as their upstreams will
  256. // run. That callback (see later in this method) acquires the same
  257. // _gate lock that we currently hold, so we need to be aware that
  258. // our fields could change during this call even though we own the lock.
  259. //
  260. // That said, it shouldn't change _state: those callbacks will only
  261. // disconnect if the observer count drops to zero, and since that count
  262. // includes the subscription currently being established, for which
  263. // we've not yet registered the upstream disposable, there should still
  264. // be at least one subscriber right now.
  265. Debug.Assert(parent._state == State.ConnectedWithSubscribers, "Unexpected state change in Connect");
  266. }
  267. if (shouldConnect || shouldCancelDelayedDisconnect)
  268. {
  269. // If a delayed disconnect work item has been scheduled, it will
  270. // already be in _serial, so this will cancel it. In any case, this
  271. // ensures that an unused SingleAssignmentDisposable is available for
  272. // the upstream disposal callback to use when it needs to set up the
  273. // delayed disconnect work item.
  274. Disposable.TrySetSerial(ref parent._serial, new SingleAssignmentDisposable());
  275. }
  276. }
  277. SetUpstream(Disposable.Create(
  278. (parent, subscription),
  279. static tuple =>
  280. {
  281. var (closureParent, closureSubscription) = tuple;
  282. closureSubscription.Dispose();
  283. lock (closureParent._gate)
  284. {
  285. if (--closureParent._count == 0)
  286. {
  287. // It's possible for the count to reach 0 without ever having
  288. // gone above the minObservers threshold, in which case we
  289. // won't ever have called Connect. More subtly, when sources
  290. // call OnComplete inside Subscribe, it's possible for this
  291. // Disposable callback to run *inside* the call to Connect
  292. // above.
  293. // So we only want to schedule the disconnection work item if
  294. // we have already connected.
  295. if (closureParent._state == State.ConnectedWithSubscribers)
  296. {
  297. closureParent._state = State.ConnectedWithNoSubscribers;
  298. // NB: _serial is guaranteed to be set by TrySetSerial earlier on.
  299. var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial)!;
  300. cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, static tuple2 =>
  301. {
  302. lock (tuple2.closureParent._gate)
  303. {
  304. if (ReferenceEquals(Volatile.Read(ref tuple2.closureParent._serial), tuple2.cancelable))
  305. {
  306. tuple2.closureParent._state = State.DisconnectedNoSubscribers;
  307. // NB: _connectableSubscription is guaranteed to be set above, and Disposable.Create protects against double-dispose.
  308. var connectableSubscription = tuple2.closureParent._connectableSubscription!;
  309. tuple2.closureParent._connectableSubscription = null;
  310. connectableSubscription.Dispose();
  311. }
  312. }
  313. });
  314. }
  315. else // closureParent._state == State.ConnectedWithSubscribers
  316. {
  317. // This callback should only run when we have at least one subscriber,
  318. // so if we weren't in ConnectedWithSubscribers, we'd should be in
  319. // DisconnectedWithSubscribers.
  320. Debug.Assert(closureParent._state == State.DisconnectedWithSubscribers);
  321. closureParent._state = State.DisconnectedNoSubscribers;
  322. }
  323. }
  324. }
  325. }));
  326. }
  327. }
  328. }
  329. }
  330. }