TakeUntil.cs 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal sealed class TakeUntil<TSource, TOther> : Producer<TSource>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. private readonly IObservable<TOther> _other;
  12. public TakeUntil(IObservable<TSource> source, IObservable<TOther> other)
  13. {
  14. _source = source;
  15. _other = other;
  16. }
  17. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  18. {
  19. var sink = new _(this, observer, cancel);
  20. setSink(sink);
  21. return sink.Run();
  22. }
  23. class _ : Sink<TSource>
  24. {
  25. private readonly TakeUntil<TSource, TOther> _parent;
  26. public _(TakeUntil<TSource, TOther> parent, IObserver<TSource> observer, IDisposable cancel)
  27. : base(observer, cancel)
  28. {
  29. _parent = parent;
  30. }
  31. public IDisposable Run()
  32. {
  33. var sourceObserver = new T(this);
  34. var otherObserver = new O(this, sourceObserver);
  35. // COMPAT - Order of Subscribe calls per v1.0.10621
  36. var otherSubscription = _parent._other.SubscribeSafe(otherObserver);
  37. otherObserver.Disposable = otherSubscription;
  38. var sourceSubscription = _parent._source.SubscribeSafe(sourceObserver);
  39. return StableCompositeDisposable.Create(
  40. otherSubscription,
  41. sourceSubscription
  42. );
  43. }
  44. /*
  45. * We tried a more fine-grained synchronization scheme to make TakeUntil more efficient, but
  46. * this requires several CAS instructions, which quickly add up to being non-beneficial.
  47. *
  48. * Notice an approach where the "other" channel performs an Interlocked.Exchange operation on
  49. * the _parent._observer field to substitute it with a NopObserver<TSource> doesn't work,
  50. * because the "other" channel still need to send an OnCompleted message, which could happen
  51. * concurrently with another message when the "source" channel has already read from the
  52. * _parent._observer field between making the On* call.
  53. *
  54. * Fixing this issue requires an ownership transfer mechanism for channels to get exclusive
  55. * access to the outgoing observer while dispatching a message. Doing this more fine-grained
  56. * than using locks turns out to be tricky and doesn't reduce cost.
  57. */
  58. class T : IObserver<TSource>
  59. {
  60. private readonly _ _parent;
  61. public volatile bool _open;
  62. public T(_ parent)
  63. {
  64. _parent = parent;
  65. _open = false;
  66. }
  67. public void OnNext(TSource value)
  68. {
  69. if (_open)
  70. {
  71. _parent._observer.OnNext(value);
  72. }
  73. else
  74. {
  75. lock (_parent)
  76. {
  77. _parent._observer.OnNext(value);
  78. }
  79. }
  80. }
  81. public void OnError(Exception error)
  82. {
  83. lock (_parent)
  84. {
  85. _parent._observer.OnError(error);
  86. _parent.Dispose();
  87. }
  88. }
  89. public void OnCompleted()
  90. {
  91. lock (_parent)
  92. {
  93. _parent._observer.OnCompleted();
  94. _parent.Dispose();
  95. }
  96. }
  97. }
  98. class O : IObserver<TOther>
  99. {
  100. private readonly _ _parent;
  101. private readonly T _sourceObserver;
  102. private readonly SingleAssignmentDisposable _subscription;
  103. public O(_ parent, T sourceObserver)
  104. {
  105. _parent = parent;
  106. _sourceObserver = sourceObserver;
  107. _subscription = new SingleAssignmentDisposable();
  108. }
  109. public IDisposable Disposable
  110. {
  111. set { _subscription.Disposable = value; }
  112. }
  113. public void OnNext(TOther value)
  114. {
  115. lock (_parent)
  116. {
  117. _parent._observer.OnCompleted();
  118. _parent.Dispose();
  119. }
  120. }
  121. public void OnError(Exception error)
  122. {
  123. lock (_parent)
  124. {
  125. _parent._observer.OnError(error);
  126. _parent.Dispose();
  127. }
  128. }
  129. public void OnCompleted()
  130. {
  131. lock (_parent)
  132. {
  133. _sourceObserver._open = true;
  134. _subscription.Dispose();
  135. }
  136. }
  137. }
  138. }
  139. }
  140. internal sealed class TakeUntil<TSource> : Producer<TSource>
  141. {
  142. private readonly IObservable<TSource> _source;
  143. private readonly DateTimeOffset _endTime;
  144. internal readonly IScheduler _scheduler;
  145. public TakeUntil(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler)
  146. {
  147. _source = source;
  148. _endTime = endTime;
  149. _scheduler = scheduler;
  150. }
  151. public IObservable<TSource> Omega(DateTimeOffset endTime)
  152. {
  153. //
  154. // Minimum semantics:
  155. //
  156. // t 0--1--2--3--4--5--6--7-> t 0--1--2--3--4--5--6--7->
  157. //
  158. // xs --o--o--o--o--o--o--| xs --o--o--o--o--o--o--|
  159. // xs.TU(5AM) --o--o--o--o--o| xs.TU(3AM) --o--o--o|
  160. // xs.TU(5AM).TU(3AM) --o--o--o| xs.TU(3AM).TU(5AM) --o--o--o|
  161. //
  162. if (_endTime <= endTime)
  163. return this;
  164. else
  165. return new TakeUntil<TSource>(_source, endTime, _scheduler);
  166. }
  167. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  168. {
  169. var sink = new _(this, observer, cancel);
  170. setSink(sink);
  171. return sink.Run();
  172. }
  173. class _ : Sink<TSource>, IObserver<TSource>
  174. {
  175. private readonly TakeUntil<TSource> _parent;
  176. public _(TakeUntil<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  177. : base(observer, cancel)
  178. {
  179. _parent = parent;
  180. }
  181. private object _gate;
  182. public IDisposable Run()
  183. {
  184. _gate = new object();
  185. var t = _parent._scheduler.Schedule(_parent._endTime, Tick);
  186. var d = _parent._source.SubscribeSafe(this);
  187. return StableCompositeDisposable.Create(t, d);
  188. }
  189. private void Tick()
  190. {
  191. lock (_gate)
  192. {
  193. base._observer.OnCompleted();
  194. base.Dispose();
  195. }
  196. }
  197. public void OnNext(TSource value)
  198. {
  199. lock (_gate)
  200. {
  201. base._observer.OnNext(value);
  202. }
  203. }
  204. public void OnError(Exception error)
  205. {
  206. lock (_gate)
  207. {
  208. base._observer.OnError(error);
  209. base.Dispose();
  210. }
  211. }
  212. public void OnCompleted()
  213. {
  214. lock (_gate)
  215. {
  216. base._observer.OnCompleted();
  217. base.Dispose();
  218. }
  219. }
  220. }
  221. }
  222. }