RefCount.cs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. using System.Threading;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. internal static class RefCount<TSource>
  11. {
  12. internal sealed class Eager : Producer<TSource, Eager._>
  13. {
  14. private readonly IConnectableObservable<TSource> _source;
  15. private readonly object _gate;
  16. private int _count;
  17. private IDisposable _connectableSubscription;
  18. public Eager(IConnectableObservable<TSource> source)
  19. {
  20. _source = source;
  21. _gate = new object();
  22. _count = 0;
  23. _connectableSubscription = default(IDisposable);
  24. }
  25. protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
  26. protected override void Run(_ sink) => sink.Run();
  27. internal sealed class _ : IdentitySink<TSource>
  28. {
  29. readonly Eager _parent;
  30. public _(IObserver<TSource> observer, Eager parent)
  31. : base(observer)
  32. {
  33. this._parent = parent;
  34. }
  35. public void Run()
  36. {
  37. base.Run(_parent._source);
  38. lock (_parent._gate)
  39. {
  40. if (++_parent._count == 1)
  41. {
  42. // We need to set _connectableSubscription to something
  43. // before Connect because if Connect terminates synchronously,
  44. // Dispose(bool) gets executed and will try to dispose
  45. // _connectableSubscription of null.
  46. // ?.Dispose() is no good because the dispose action has to be
  47. // executed anyway.
  48. // We can't inline SAD either because the IDisposable of Connect
  49. // may belong to the wrong connection.
  50. var sad = new SingleAssignmentDisposable();
  51. _parent._connectableSubscription = sad;
  52. sad.Disposable = _parent._source.Connect();
  53. }
  54. }
  55. }
  56. protected override void Dispose(bool disposing)
  57. {
  58. base.Dispose(disposing);
  59. if (disposing)
  60. {
  61. lock (_parent._gate)
  62. {
  63. if (--_parent._count == 0)
  64. {
  65. _parent._connectableSubscription.Dispose();
  66. }
  67. }
  68. }
  69. }
  70. }
  71. }
  72. internal sealed class Lazy : Producer<TSource, Lazy._>
  73. {
  74. private readonly object _gate;
  75. private readonly IScheduler _scheduler;
  76. private readonly TimeSpan _disconnectTime;
  77. private readonly IConnectableObservable<TSource> _source;
  78. private IDisposable _serial;
  79. private int _count;
  80. private IDisposable _connectableSubscription;
  81. public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
  82. {
  83. _source = source;
  84. _gate = new object();
  85. _disconnectTime = disconnectTime;
  86. _scheduler = scheduler;
  87. _count = 0;
  88. _connectableSubscription = default(IDisposable);
  89. }
  90. protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
  91. protected override void Run(_ sink) => sink.Run(this);
  92. internal sealed class _ : IdentitySink<TSource>
  93. {
  94. public _(IObserver<TSource> observer)
  95. : base(observer)
  96. {
  97. }
  98. public void Run(Lazy parent)
  99. {
  100. var subscription = parent._source.SubscribeSafe(this);
  101. lock (parent._gate)
  102. {
  103. if (++parent._count == 1)
  104. {
  105. if (parent._connectableSubscription == null)
  106. parent._connectableSubscription = parent._source.Connect();
  107. Disposable.TrySetSerial(ref parent._serial, new SingleAssignmentDisposable());
  108. }
  109. }
  110. SetUpstream(Disposable.Create(
  111. (parent, subscription),
  112. tuple =>
  113. {
  114. var (closureParent, closureSubscription) = tuple;
  115. closureSubscription.Dispose();
  116. lock (closureParent._gate)
  117. {
  118. if (--closureParent._count == 0)
  119. {
  120. var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial);
  121. cancelable.Disposable = closureParent._scheduler.Schedule((cancelable, closureParent), closureParent._disconnectTime, (self, tuple2) =>
  122. {
  123. lock (tuple2.closureParent._gate)
  124. {
  125. if (object.ReferenceEquals(Volatile.Read(ref tuple2.closureParent._serial), tuple2.cancelable))
  126. {
  127. tuple2.closureParent._connectableSubscription.Dispose();
  128. tuple2.closureParent._connectableSubscription = null;
  129. }
  130. }
  131. return Disposable.Empty;
  132. });
  133. }
  134. }
  135. }));
  136. }
  137. }
  138. }
  139. }
  140. }