Switch.cs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. internal sealed class Switch<TSource> : Producer<TSource>
  8. {
  9. private readonly IObservable<IObservable<TSource>> _sources;
  10. public Switch(IObservable<IObservable<TSource>> sources)
  11. {
  12. _sources = sources;
  13. }
  14. protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
  15. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  16. {
  17. var sink = new _(observer, cancel);
  18. setSink(sink);
  19. return sink.Run(this);
  20. }
  21. private sealed class _ : Sink<TSource>, IObserver<IObservable<TSource>>
  22. {
  23. private readonly object _gate = new object();
  24. public _(IObserver<TSource> observer, IDisposable cancel)
  25. : base(observer, cancel)
  26. {
  27. }
  28. private IDisposable _subscription;
  29. private SerialDisposable _innerSubscription;
  30. private bool _isStopped;
  31. private ulong _latest;
  32. private bool _hasLatest;
  33. public IDisposable Run(Switch<TSource> parent)
  34. {
  35. _innerSubscription = new SerialDisposable();
  36. _isStopped = false;
  37. _latest = 0UL;
  38. _hasLatest = false;
  39. var subscription = new SingleAssignmentDisposable();
  40. _subscription = subscription;
  41. subscription.Disposable = parent._sources.SubscribeSafe(this);
  42. return StableCompositeDisposable.Create(_subscription, _innerSubscription);
  43. }
  44. public void OnNext(IObservable<TSource> value)
  45. {
  46. var id = default(ulong);
  47. lock (_gate)
  48. {
  49. id = unchecked(++_latest);
  50. _hasLatest = true;
  51. }
  52. var d = new SingleAssignmentDisposable();
  53. _innerSubscription.Disposable = d;
  54. d.Disposable = value.SubscribeSafe(new InnerObserver(this, id, d));
  55. }
  56. public void OnError(Exception error)
  57. {
  58. lock (_gate)
  59. base._observer.OnError(error);
  60. base.Dispose();
  61. }
  62. public void OnCompleted()
  63. {
  64. lock (_gate)
  65. {
  66. _subscription.Dispose();
  67. _isStopped = true;
  68. if (!_hasLatest)
  69. {
  70. base._observer.OnCompleted();
  71. base.Dispose();
  72. }
  73. }
  74. }
  75. private sealed class InnerObserver : IObserver<TSource>
  76. {
  77. private readonly _ _parent;
  78. private readonly ulong _id;
  79. private readonly IDisposable _self;
  80. public InnerObserver(_ parent, ulong id, IDisposable self)
  81. {
  82. _parent = parent;
  83. _id = id;
  84. _self = self;
  85. }
  86. public void OnNext(TSource value)
  87. {
  88. lock (_parent._gate)
  89. {
  90. if (_parent._latest == _id)
  91. {
  92. _parent._observer.OnNext(value);
  93. }
  94. }
  95. }
  96. public void OnError(Exception error)
  97. {
  98. lock (_parent._gate)
  99. {
  100. _self.Dispose();
  101. if (_parent._latest == _id)
  102. {
  103. _parent._observer.OnError(error);
  104. _parent.Dispose();
  105. }
  106. }
  107. }
  108. public void OnCompleted()
  109. {
  110. lock (_parent._gate)
  111. {
  112. _self.Dispose();
  113. if (_parent._latest == _id)
  114. {
  115. _parent._hasLatest = false;
  116. if (_parent._isStopped)
  117. {
  118. _parent._observer.OnCompleted();
  119. _parent.Dispose();
  120. }
  121. }
  122. }
  123. }
  124. }
  125. }
  126. }
  127. }