WithLatestFrom.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. internal sealed class WithLatestFrom<TFirst, TSecond, TResult> : Producer<TResult, WithLatestFrom<TFirst, TSecond, TResult>._>
  8. {
  9. private readonly IObservable<TFirst> _first;
  10. private readonly IObservable<TSecond> _second;
  11. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  12. public WithLatestFrom(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  13. {
  14. _first = first;
  15. _second = second;
  16. _resultSelector = resultSelector;
  17. }
  18. protected override _ CreateSink(IObserver<TResult> observer) => new _(_resultSelector, observer);
  19. protected override void Run(_ sink) => sink.Run(_first, _second);
  20. internal sealed class _ : IdentitySink<TResult>
  21. {
  22. private readonly object _gate = new object();
  23. private readonly object _latestGate = new object();
  24. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  25. public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
  26. : base(observer)
  27. {
  28. _resultSelector = resultSelector;
  29. }
  30. private volatile bool _hasLatest;
  31. private TSecond? _latest;
  32. private IDisposable? _secondDisposable;
  33. public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
  34. {
  35. var fstO = new FirstObserver(this);
  36. var sndO = new SecondObserver(this);
  37. Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(sndO));
  38. SetUpstream(first.SubscribeSafe(fstO));
  39. }
  40. protected override void Dispose(bool disposing)
  41. {
  42. if (disposing)
  43. {
  44. Disposable.Dispose(ref _secondDisposable);
  45. }
  46. base.Dispose(disposing);
  47. }
  48. private sealed class FirstObserver : IObserver<TFirst>
  49. {
  50. private readonly _ _parent;
  51. public FirstObserver(_ parent)
  52. {
  53. _parent = parent;
  54. }
  55. public void OnCompleted()
  56. {
  57. lock (_parent._gate)
  58. {
  59. _parent.ForwardOnCompleted();
  60. }
  61. }
  62. public void OnError(Exception error)
  63. {
  64. lock (_parent._gate)
  65. {
  66. _parent.ForwardOnError(error);
  67. }
  68. }
  69. public void OnNext(TFirst value)
  70. {
  71. if (_parent._hasLatest) // Volatile read
  72. {
  73. TSecond latest;
  74. lock (_parent._latestGate)
  75. {
  76. latest = _parent._latest!; // NB: Not null when hasLatest is true.
  77. }
  78. TResult res;
  79. try
  80. {
  81. res = _parent._resultSelector(value, latest);
  82. }
  83. catch (Exception ex)
  84. {
  85. lock (_parent._gate)
  86. {
  87. _parent.ForwardOnError(ex);
  88. }
  89. return;
  90. }
  91. lock (_parent._gate)
  92. {
  93. _parent.ForwardOnNext(res);
  94. }
  95. }
  96. }
  97. }
  98. private sealed class SecondObserver : IObserver<TSecond>
  99. {
  100. private readonly _ _parent;
  101. public SecondObserver(_ parent)
  102. {
  103. _parent = parent;
  104. }
  105. public void OnCompleted()
  106. {
  107. Disposable.Dispose(ref _parent._secondDisposable);
  108. }
  109. public void OnError(Exception error)
  110. {
  111. lock (_parent._gate)
  112. {
  113. _parent.ForwardOnError(error);
  114. }
  115. }
  116. public void OnNext(TSecond value)
  117. {
  118. lock (_parent._latestGate)
  119. {
  120. _parent._latest = value;
  121. }
  122. if (!_parent._hasLatest)
  123. {
  124. _parent._hasLatest = true;
  125. }
  126. }
  127. }
  128. }
  129. }
  130. }