AutoDetachObserver.cs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #nullable disable
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive
  7. {
  8. internal sealed class AutoDetachObserver<T> : ObserverBase<T>, ISafeObserver<T>
  9. {
  10. private readonly IObserver<T> _observer;
  11. private IDisposable _disposable;
  12. public AutoDetachObserver(IObserver<T> observer)
  13. {
  14. _observer = observer;
  15. }
  16. public void SetResource(IDisposable resource)
  17. {
  18. Disposable.SetSingle(ref _disposable, resource);
  19. }
  20. protected override void OnNextCore(T value)
  21. {
  22. //
  23. // Safeguarding of the pipeline against rogue observers is required for proper
  24. // resource cleanup. Consider the following example:
  25. //
  26. // var xs = Observable.Interval(TimeSpan.FromSeconds(1));
  27. // var ys = <some random sequence>;
  28. // var res = xs.CombineLatest(ys, (x, y) => x + y);
  29. //
  30. // The marble diagram of the query above looks as follows:
  31. //
  32. // xs -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
  33. // | | | | | | | | |
  34. // ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
  35. // | | | | | | | | | | | | | |
  36. // v v v v v v v v v v v v v v
  37. // res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
  38. // |
  39. // @#&
  40. //
  41. // Notice the free-threaded nature of Rx, where messages on the resulting sequence
  42. // are produced by either of the two input sequences to CombineLatest.
  43. //
  44. // Now assume an exception happens in the OnNext callback for the observer of res,
  45. // at the indicated point marked with @#& above. The callback runs in the context
  46. // of ys, so the exception will take down the scheduler thread of ys. This by
  47. // itself is a problem (that can be mitigated by a Catch operator on IScheduler),
  48. // but notice how the timer that produces xs is kept alive.
  49. //
  50. // The safe-guarding code below ensures the acquired resources are disposed when
  51. // the user callback throws.
  52. //
  53. var __noError = false;
  54. try
  55. {
  56. _observer.OnNext(value);
  57. __noError = true;
  58. }
  59. finally
  60. {
  61. if (!__noError)
  62. {
  63. Dispose();
  64. }
  65. }
  66. }
  67. protected override void OnErrorCore(Exception exception)
  68. {
  69. try
  70. {
  71. _observer.OnError(exception);
  72. }
  73. finally
  74. {
  75. Dispose();
  76. }
  77. }
  78. protected override void OnCompletedCore()
  79. {
  80. try
  81. {
  82. _observer.OnCompleted();
  83. }
  84. finally
  85. {
  86. Dispose();
  87. }
  88. }
  89. protected override void Dispose(bool disposing)
  90. {
  91. base.Dispose(disposing);
  92. if (disposing)
  93. {
  94. Disposable.TryDispose(ref _disposable);
  95. }
  96. }
  97. }
  98. }