SkipUntil.cs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal sealed class SkipUntil<TSource, TOther> : Producer<TSource, SkipUntil<TSource, TOther>._>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. private readonly IObservable<TOther> _other;
  12. public SkipUntil(IObservable<TSource> source, IObservable<TOther> other)
  13. {
  14. _source = source;
  15. _other = other;
  16. }
  17. protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
  18. protected override IDisposable Run(_ sink) => sink.Run(this);
  19. internal sealed class _ : Sink<TSource>
  20. {
  21. public _(IObserver<TSource> observer, IDisposable cancel)
  22. : base(observer, cancel)
  23. {
  24. }
  25. public IDisposable Run(SkipUntil<TSource, TOther> parent)
  26. {
  27. var sourceObserver = new SourceObserver(this);
  28. var otherObserver = new OtherObserver(this, sourceObserver);
  29. var sourceSubscription = parent._source.SubscribeSafe(sourceObserver);
  30. var otherSubscription = parent._other.SubscribeSafe(otherObserver);
  31. sourceObserver.Disposable = sourceSubscription;
  32. otherObserver.Disposable = otherSubscription;
  33. return StableCompositeDisposable.Create(
  34. sourceSubscription,
  35. otherSubscription
  36. );
  37. }
  38. private sealed class SourceObserver : IObserver<TSource>
  39. {
  40. private readonly _ _parent;
  41. public volatile IObserver<TSource> _observer;
  42. private readonly SingleAssignmentDisposable _subscription;
  43. public SourceObserver(_ parent)
  44. {
  45. _parent = parent;
  46. _observer = NopObserver<TSource>.Instance;
  47. _subscription = new SingleAssignmentDisposable();
  48. }
  49. public IDisposable Disposable
  50. {
  51. set { _subscription.Disposable = value; }
  52. }
  53. public void OnNext(TSource value)
  54. {
  55. _observer.OnNext(value);
  56. }
  57. public void OnError(Exception error)
  58. {
  59. _parent._observer.OnError(error);
  60. _parent.Dispose();
  61. }
  62. public void OnCompleted()
  63. {
  64. _observer.OnCompleted();
  65. _subscription.Dispose(); // We can't cancel the other stream yet, it may be on its way to dispatch an OnError message and we don't want to have a race.
  66. }
  67. }
  68. private sealed class OtherObserver : IObserver<TOther>
  69. {
  70. private readonly _ _parent;
  71. private readonly SourceObserver _sourceObserver;
  72. private readonly SingleAssignmentDisposable _subscription;
  73. public OtherObserver(_ parent, SourceObserver sourceObserver)
  74. {
  75. _parent = parent;
  76. _sourceObserver = sourceObserver;
  77. _subscription = new SingleAssignmentDisposable();
  78. }
  79. public IDisposable Disposable
  80. {
  81. set { _subscription.Disposable = value; }
  82. }
  83. public void OnNext(TOther value)
  84. {
  85. _sourceObserver._observer = _parent._observer;
  86. _subscription.Dispose();
  87. }
  88. public void OnError(Exception error)
  89. {
  90. _parent._observer.OnError(error);
  91. _parent.Dispose();
  92. }
  93. public void OnCompleted()
  94. {
  95. _subscription.Dispose();
  96. }
  97. }
  98. }
  99. }
  100. internal sealed class SkipUntil<TSource> : Producer<TSource, SkipUntil<TSource>._>
  101. {
  102. private readonly IObservable<TSource> _source;
  103. private readonly DateTimeOffset _startTime;
  104. internal readonly IScheduler _scheduler;
  105. public SkipUntil(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler)
  106. {
  107. _source = source;
  108. _startTime = startTime;
  109. _scheduler = scheduler;
  110. }
  111. public IObservable<TSource> Combine(DateTimeOffset startTime)
  112. {
  113. //
  114. // Maximum semantics:
  115. //
  116. // t 0--1--2--3--4--5--6--7-> t 0--1--2--3--4--5--6--7->
  117. //
  118. // xs --o--o--o--o--o--o--| xs --o--o--o--o--o--o--|
  119. // xs.SU(5AM) xxxxxxxxxxxxxxxx-o--| xs.SU(3AM) xxxxxxxxxx-o--o--o--|
  120. // xs.SU(5AM).SU(3AM) xxxxxxxxx--------o--| xs.SU(3AM).SU(5AM) xxxxxxxxxxxxxxxx-o--|
  121. //
  122. if (startTime <= _startTime)
  123. return this;
  124. else
  125. return new SkipUntil<TSource>(_source, startTime, _scheduler);
  126. }
  127. protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
  128. protected override IDisposable Run(_ sink) => sink.Run(this);
  129. internal sealed class _ : Sink<TSource>, IObserver<TSource>
  130. {
  131. private volatile bool _open;
  132. public _(IObserver<TSource> observer, IDisposable cancel)
  133. : base(observer, cancel)
  134. {
  135. }
  136. public IDisposable Run(SkipUntil<TSource> parent)
  137. {
  138. var t = parent._scheduler.Schedule(parent._startTime, Tick);
  139. var d = parent._source.SubscribeSafe(this);
  140. return StableCompositeDisposable.Create(t, d);
  141. }
  142. private void Tick()
  143. {
  144. _open = true;
  145. }
  146. public void OnNext(TSource value)
  147. {
  148. if (_open)
  149. base._observer.OnNext(value);
  150. }
  151. public void OnError(Exception error)
  152. {
  153. base._observer.OnError(error);
  154. base.Dispose();
  155. }
  156. public void OnCompleted()
  157. {
  158. base._observer.OnCompleted();
  159. base.Dispose();
  160. }
  161. }
  162. }
  163. }