AutoDetachObserver.cs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive
  6. {
  7. class AutoDetachObserver<T> : ObserverBase<T>
  8. {
  9. private readonly IObserver<T> observer;
  10. private readonly SingleAssignmentDisposable m = new SingleAssignmentDisposable();
  11. public AutoDetachObserver(IObserver<T> observer)
  12. {
  13. this.observer = observer;
  14. }
  15. public IDisposable Disposable
  16. {
  17. set { m.Disposable = value; }
  18. }
  19. protected override void OnNextCore(T value)
  20. {
  21. //
  22. // Safeguarding of the pipeline against rogue observers is required for proper
  23. // resource cleanup. Consider the following example:
  24. //
  25. // var xs = Observable.Interval(TimeSpan.FromSeconds(1));
  26. // var ys = <some random sequence>;
  27. // var res = xs.CombineLatest(ys, (x, y) => x + y);
  28. //
  29. // The marble diagram of the query above looks as follows:
  30. //
  31. // xs -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
  32. // | | | | | | | | |
  33. // ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
  34. // | | | | | | | | | | | | | |
  35. // v v v v v v v v v v v v v v
  36. // res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
  37. // |
  38. // @#&
  39. //
  40. // Notice the free-threaded nature of Rx, where messages on the resulting sequence
  41. // are produced by either of the two input sequences to CombineLatest.
  42. //
  43. // Now assume an exception happens in the OnNext callback for the observer of res,
  44. // at the indicated point marked with @#& above. The callback runs in the context
  45. // of ys, so the exception will take down the scheduler thread of ys. This by
  46. // itself is a problem (that can be mitigated by a Catch operator on IScheduler),
  47. // but notice how the timer that produces xs is kept alive.
  48. //
  49. // The safe-guarding code below ensures the acquired resources are disposed when
  50. // the user callback throws.
  51. //
  52. var __noError = false;
  53. try
  54. {
  55. observer.OnNext(value);
  56. __noError = true;
  57. }
  58. finally
  59. {
  60. if (!__noError)
  61. Dispose();
  62. }
  63. }
  64. protected override void OnErrorCore(Exception exception)
  65. {
  66. try
  67. {
  68. observer.OnError(exception);
  69. }
  70. finally
  71. {
  72. Dispose();
  73. }
  74. }
  75. protected override void OnCompletedCore()
  76. {
  77. try
  78. {
  79. observer.OnCompleted();
  80. }
  81. finally
  82. {
  83. Dispose();
  84. }
  85. }
  86. protected override void Dispose(bool disposing)
  87. {
  88. base.Dispose(disposing);
  89. if (disposing)
  90. m.Dispose();
  91. }
  92. }
  93. }