Sink.cs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive
  6. {
  7. internal interface ISink<in TTarget>
  8. {
  9. void ForwardOnNext(TTarget value);
  10. void ForwardOnCompleted();
  11. void ForwardOnError(Exception error);
  12. }
  13. internal abstract class Sink<TTarget> : ISink<TTarget>, IDisposable
  14. {
  15. private IDisposable _upstream;
  16. private volatile IObserver<TTarget> _observer;
  17. protected Sink(IObserver<TTarget> observer)
  18. {
  19. _observer = observer;
  20. }
  21. public void Dispose()
  22. {
  23. Dispose(true);
  24. }
  25. protected virtual void Dispose(bool disposing)
  26. {
  27. _observer = NopObserver<TTarget>.Instance;
  28. Disposable.TryDispose(ref _upstream);
  29. }
  30. public void ForwardOnNext(TTarget value)
  31. {
  32. _observer.OnNext(value);
  33. }
  34. public void ForwardOnCompleted()
  35. {
  36. _observer.OnCompleted();
  37. Dispose();
  38. }
  39. public void ForwardOnError(Exception error)
  40. {
  41. _observer.OnError(error);
  42. Dispose();
  43. }
  44. protected void SetUpstream(IDisposable upstream)
  45. {
  46. Disposable.SetSingle(ref _upstream, upstream);
  47. }
  48. protected void DisposeUpstream()
  49. {
  50. Disposable.TryDispose(ref _upstream);
  51. }
  52. }
  53. /// <summary>
  54. /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer.
  55. /// </summary>
  56. /// <typeparam name="TTarget">Type of the resulting sequence's elements.</typeparam>
  57. /// <typeparam name="TSource"></typeparam>
  58. /// <remarks>Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer.</remarks>
  59. internal abstract class Sink<TSource, TTarget> : Sink<TTarget>, IObserver<TSource>
  60. {
  61. protected Sink(IObserver<TTarget> observer) : base(observer)
  62. {
  63. }
  64. public virtual void Run(IObservable<TSource> source)
  65. {
  66. SetUpstream(source.SubscribeSafe(this));
  67. }
  68. public abstract void OnNext(TSource value);
  69. public virtual void OnError(Exception error)
  70. {
  71. ForwardOnError(error);
  72. }
  73. public virtual void OnCompleted()
  74. {
  75. ForwardOnCompleted();
  76. }
  77. public IObserver<TTarget> GetForwarder() => new _(this);
  78. private sealed class _ : IObserver<TTarget>
  79. {
  80. private readonly Sink<TSource, TTarget> _forward;
  81. public _(Sink<TSource, TTarget> forward)
  82. {
  83. _forward = forward;
  84. }
  85. public void OnNext(TTarget value)
  86. {
  87. _forward.ForwardOnNext(value);
  88. }
  89. public void OnError(Exception error)
  90. {
  91. _forward.ForwardOnError(error);
  92. }
  93. public void OnCompleted()
  94. {
  95. _forward.ForwardOnCompleted();
  96. }
  97. }
  98. }
  99. }