ObservableBase.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive
  7. {
  8. /// <summary>
  9. /// Abstract base class for implementations of the IObservable&lt;T&gt; interface.
  10. /// </summary>
  11. /// <remarks>
  12. /// If you don't need a named type to create an observable sequence (i.e. you rather need
  13. /// an instance rather than a reusable type), use the Observable.Create method to create
  14. /// an observable sequence with specified subscription behavior.
  15. /// </remarks>
  16. /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
  17. public abstract class ObservableBase<T> : IObservable<T>
  18. {
  19. /// <summary>
  20. /// Subscribes the given observer to the observable sequence.
  21. /// </summary>
  22. /// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
  23. /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
  24. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  25. public IDisposable Subscribe(IObserver<T> observer)
  26. {
  27. if (observer == null)
  28. throw new ArgumentNullException("observer");
  29. var autoDetachObserver = new AutoDetachObserver<T>(observer);
  30. if (CurrentThreadScheduler.IsScheduleRequired)
  31. {
  32. //
  33. // Notice we don't protect this piece of code using an exception handler to
  34. // redirect errors to the OnError channel. This call to Schedule will run the
  35. // trampoline, so we'd be catching all exceptions, including those from user
  36. // callbacks that happen to run there. For example, consider:
  37. //
  38. // Observable.Return(42, Scheduler.CurrentThread)
  39. // .Subscribe(x => { throw new Exception(); });
  40. //
  41. // Here, the OnNext(42) call would be scheduled on the trampoline, so when we
  42. // return from the scheduled Subscribe call, the CurrentThreadScheduler moves
  43. // on to invoking this work item. Too much of protection here would cause the
  44. // exception thrown in OnNext to circle back to OnError, which looks like the
  45. // sequence can't make up its mind.
  46. //
  47. CurrentThreadScheduler.Instance.Schedule(autoDetachObserver, ScheduledSubscribe);
  48. }
  49. else
  50. {
  51. try
  52. {
  53. autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
  54. }
  55. catch (Exception exception)
  56. {
  57. //
  58. // This can happen when there's a synchronous callback to OnError in the
  59. // implementation of SubscribeCore, which also throws. So, we're seeing
  60. // an exception being thrown from a handler.
  61. //
  62. // For compat with v1.x, we rethrow the exception in this case, keeping
  63. // in mind this should be rare but if it happens, something's totally
  64. // screwed up.
  65. //
  66. if (!autoDetachObserver.Fail(exception))
  67. throw;
  68. }
  69. }
  70. return autoDetachObserver;
  71. }
  72. private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver<T> autoDetachObserver)
  73. {
  74. try
  75. {
  76. autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
  77. }
  78. catch (Exception exception)
  79. {
  80. //
  81. // This can happen when there's a synchronous callback to OnError in the
  82. // implementation of SubscribeCore, which also throws. So, we're seeing
  83. // an exception being thrown from a handler.
  84. //
  85. // For compat with v1.x, we rethrow the exception in this case, keeping
  86. // in mind this should be rare but if it happens, something's totally
  87. // screwed up.
  88. //
  89. if (!autoDetachObserver.Fail(exception))
  90. throw;
  91. }
  92. return Disposable.Empty;
  93. }
  94. /// <summary>
  95. /// Implement this method with the core subscription logic for the observable sequence.
  96. /// </summary>
  97. /// <param name="observer">Observer to send notifications to.</param>
  98. /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
  99. protected abstract IDisposable SubscribeCore(IObserver<T> observer);
  100. }
  101. }