1
0

Sink.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. namespace System.Reactive
  7. {
  8. internal interface ISink<in TTarget>
  9. {
  10. void ForwardOnNext(TTarget value);
  11. void ForwardOnCompleted();
  12. void ForwardOnError(Exception error);
  13. }
  14. internal abstract class Sink<TTarget> : ISink<TTarget>, IDisposable
  15. {
  16. private IDisposable _upstream;
  17. private volatile IObserver<TTarget> _observer;
  18. protected Sink(IObserver<TTarget> observer)
  19. {
  20. _observer = observer;
  21. }
  22. public void Dispose()
  23. {
  24. if (Interlocked.Exchange(ref _observer, NopObserver<TTarget>.Instance) != NopObserver<TTarget>.Instance)
  25. Dispose(true);
  26. }
  27. /// <summary>
  28. /// Override this method to dispose additional resources.
  29. /// The method is guaranteed to be called at most once.
  30. /// </summary>
  31. /// <param name="disposing">If true, the method was called from <see cref="Dispose()"/>.</param>
  32. protected virtual void Dispose(bool disposing)
  33. {
  34. //Calling base.Dispose(true) is not a proper disposal, so we can omit the assignment here.
  35. //Sink is internal so this can pretty much be enforced.
  36. //_observer = NopObserver<TTarget>.Instance;
  37. Disposable.TryDispose(ref _upstream);
  38. }
  39. public void ForwardOnNext(TTarget value)
  40. {
  41. _observer.OnNext(value);
  42. }
  43. public void ForwardOnCompleted()
  44. {
  45. _observer.OnCompleted();
  46. Dispose();
  47. }
  48. public void ForwardOnError(Exception error)
  49. {
  50. _observer.OnError(error);
  51. Dispose();
  52. }
  53. protected void SetUpstream(IDisposable upstream)
  54. {
  55. Disposable.SetSingle(ref _upstream, upstream);
  56. }
  57. protected void DisposeUpstream()
  58. {
  59. Disposable.TryDispose(ref _upstream);
  60. }
  61. }
  62. /// <summary>
  63. /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer.
  64. /// </summary>
  65. /// <typeparam name="TTarget">Type of the resulting sequence's elements.</typeparam>
  66. /// <typeparam name="TSource"></typeparam>
  67. /// <remarks>Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer.</remarks>
  68. internal abstract class Sink<TSource, TTarget> : Sink<TTarget>, IObserver<TSource>
  69. {
  70. protected Sink(IObserver<TTarget> observer) : base(observer)
  71. {
  72. }
  73. public virtual void Run(IObservable<TSource> source)
  74. {
  75. SetUpstream(source.SubscribeSafe(this));
  76. }
  77. public abstract void OnNext(TSource value);
  78. public virtual void OnError(Exception error) => ForwardOnError(error);
  79. public virtual void OnCompleted() => ForwardOnCompleted();
  80. public IObserver<TTarget> GetForwarder() => new _(this);
  81. private sealed class _ : IObserver<TTarget>
  82. {
  83. private readonly Sink<TSource, TTarget> _forward;
  84. public _(Sink<TSource, TTarget> forward)
  85. {
  86. _forward = forward;
  87. }
  88. public void OnNext(TTarget value) => _forward.ForwardOnNext(value);
  89. public void OnError(Exception error) => _forward.ForwardOnError(error);
  90. public void OnCompleted() => _forward.ForwardOnCompleted();
  91. }
  92. }
  93. }