AutoDetachObserver.cs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Disposables;
  3. namespace System.Reactive
  4. {
  5. class AutoDetachObserver<T> : ObserverBase<T>
  6. {
  7. private readonly IObserver<T> observer;
  8. private readonly SingleAssignmentDisposable m = new SingleAssignmentDisposable();
  9. public AutoDetachObserver(IObserver<T> observer)
  10. {
  11. this.observer = observer;
  12. }
  13. public IDisposable Disposable
  14. {
  15. set { m.Disposable = value; }
  16. }
  17. protected override void OnNextCore(T value)
  18. {
  19. //
  20. // Safeguarding of the pipeline against rogue observers is required for proper
  21. // resource cleanup. Consider the following example:
  22. //
  23. // var xs = Observable.Interval(TimeSpan.FromSeconds(1));
  24. // var ys = <some random sequence>;
  25. // var res = xs.CombineLatest(ys, (x, y) => x + y);
  26. //
  27. // The marble diagram of the query above looks as follows:
  28. //
  29. // xs -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
  30. // | | | | | | | | |
  31. // ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
  32. // | | | | | | | | | | | | | |
  33. // v v v v v v v v v v v v v v
  34. // res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
  35. // |
  36. // @#&
  37. //
  38. // Notice the free-threaded nature of Rx, where messages on the resulting sequence
  39. // are produced by either of the two input sequences to CombineLatest.
  40. //
  41. // Now assume an exception happens in the OnNext callback for the observer of res,
  42. // at the indicated point marked with @#& above. The callback runs in the context
  43. // of ys, so the exception will take down the scheduler thread of ys. This by
  44. // itself is a problem (that can be mitigated by a Catch operator on IScheduler),
  45. // but notice how the timer that produces xs is kept alive.
  46. //
  47. // The safe-guarding code below ensures the acquired resources are disposed when
  48. // the user callback throws.
  49. //
  50. var __noError = false;
  51. try
  52. {
  53. observer.OnNext(value);
  54. __noError = true;
  55. }
  56. finally
  57. {
  58. if (!__noError)
  59. Dispose();
  60. }
  61. }
  62. protected override void OnErrorCore(Exception exception)
  63. {
  64. try
  65. {
  66. observer.OnError(exception);
  67. }
  68. finally
  69. {
  70. Dispose();
  71. }
  72. }
  73. protected override void OnCompletedCore()
  74. {
  75. try
  76. {
  77. observer.OnCompleted();
  78. }
  79. finally
  80. {
  81. Dispose();
  82. }
  83. }
  84. protected override void Dispose(bool disposing)
  85. {
  86. base.Dispose(disposing);
  87. if (disposing)
  88. m.Dispose();
  89. }
  90. }
  91. }