ObservableBase.cs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Concurrency;
  3. using System.Reactive.Disposables;
  4. namespace System.Reactive
  5. {
  6. /// <summary>
  7. /// Abstract base class for implementations of the IObservable&lt;T&gt; interface.
  8. /// </summary>
  9. /// <remarks>
  10. /// If you don't need a named type to create an observable sequence (i.e. you rather need
  11. /// an instance rather than a reusable type), use the Observable.Create method to create
  12. /// an observable sequence with specified subscription behavior.
  13. /// </remarks>
  14. /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
  15. public abstract class ObservableBase<T> : IObservable<T>
  16. {
  17. /// <summary>
  18. /// Subscribes the given observer to the observable sequence.
  19. /// </summary>
  20. /// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
  21. /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
  22. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  23. public IDisposable Subscribe(IObserver<T> observer)
  24. {
  25. if (observer == null)
  26. throw new ArgumentNullException("observer");
  27. var autoDetachObserver = new AutoDetachObserver<T>(observer);
  28. if (CurrentThreadScheduler.IsScheduleRequired)
  29. {
  30. //
  31. // Notice we don't protect this piece of code using an exception handler to
  32. // redirect errors to the OnError channel. This call to Schedule will run the
  33. // trampoline, so we'd be catching all exceptions, including those from user
  34. // callbacks that happen to run there. For example, consider:
  35. //
  36. // Observable.Return(42, Scheduler.CurrentThread)
  37. // .Subscribe(x => { throw new Exception(); });
  38. //
  39. // Here, the OnNext(42) call would be scheduled on the trampoline, so when we
  40. // return from the scheduled Subscribe call, the CurrentThreadScheduler moves
  41. // on to invoking this work item. Too much of protection here would cause the
  42. // exception thrown in OnNext to circle back to OnError, which looks like the
  43. // sequence can't make up its mind.
  44. //
  45. CurrentThreadScheduler.Instance.Schedule(autoDetachObserver, ScheduledSubscribe);
  46. }
  47. else
  48. {
  49. try
  50. {
  51. autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
  52. }
  53. catch (Exception exception)
  54. {
  55. //
  56. // This can happen when there's a synchronous callback to OnError in the
  57. // implementation of SubscribeCore, which also throws. So, we're seeing
  58. // an exception being thrown from a handler.
  59. //
  60. // For compat with v1.x, we rethrow the exception in this case, keeping
  61. // in mind this should be rare but if it happens, something's totally
  62. // screwed up.
  63. //
  64. if (!autoDetachObserver.Fail(exception))
  65. throw;
  66. }
  67. }
  68. return autoDetachObserver;
  69. }
  70. private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver<T> autoDetachObserver)
  71. {
  72. try
  73. {
  74. autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
  75. }
  76. catch (Exception exception)
  77. {
  78. //
  79. // This can happen when there's a synchronous callback to OnError in the
  80. // implementation of SubscribeCore, which also throws. So, we're seeing
  81. // an exception being thrown from a handler.
  82. //
  83. // For compat with v1.x, we rethrow the exception in this case, keeping
  84. // in mind this should be rare but if it happens, something's totally
  85. // screwed up.
  86. //
  87. if (!autoDetachObserver.Fail(exception))
  88. throw;
  89. }
  90. return Disposable.Empty;
  91. }
  92. /// <summary>
  93. /// Implement this method with the core subscription logic for the observable sequence.
  94. /// </summary>
  95. /// <param name="observer">Observer to send notifications to.</param>
  96. /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
  97. protected abstract IDisposable SubscribeCore(IObserver<T> observer);
  98. }
  99. }