ObservableBase.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. namespace System.Reactive
  6. {
  7. /// <summary>
  8. /// Abstract base class for implementations of the <see cref="IObservable{T}"/> interface.
  9. /// </summary>
  10. /// <remarks>
  11. /// If you don't need a named type to create an observable sequence (i.e. you rather need
  12. /// an instance rather than a reusable type), use the Observable.Create method to create
  13. /// an observable sequence with specified subscription behavior.
  14. /// </remarks>
  15. /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
  16. public abstract class ObservableBase<T> : IObservable<T>
  17. {
  18. /// <summary>
  19. /// Subscribes the given observer to the observable sequence.
  20. /// </summary>
  21. /// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
  22. /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
  23. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is <c>null</c>.</exception>
  24. public IDisposable Subscribe(IObserver<T> observer)
  25. {
  26. if (observer == null)
  27. {
  28. throw new ArgumentNullException(nameof(observer));
  29. }
  30. var autoDetachObserver = new AutoDetachObserver<T>(observer);
  31. if (CurrentThreadScheduler.IsScheduleRequired)
  32. {
  33. //
  34. // Notice we don't protect this piece of code using an exception handler to
  35. // redirect errors to the OnError channel. This call to Schedule will run the
  36. // trampoline, so we'd be catching all exceptions, including those from user
  37. // callbacks that happen to run there. For example, consider:
  38. //
  39. // Observable.Return(42, Scheduler.CurrentThread)
  40. // .Subscribe(x => { throw new Exception(); });
  41. //
  42. // Here, the OnNext(42) call would be scheduled on the trampoline, so when we
  43. // return from the scheduled Subscribe call, the CurrentThreadScheduler moves
  44. // on to invoking this work item. Too much of protection here would cause the
  45. // exception thrown in OnNext to circle back to OnError, which looks like the
  46. // sequence can't make up its mind.
  47. //
  48. CurrentThreadScheduler.Instance.ScheduleAction(autoDetachObserver, ScheduledSubscribe);
  49. }
  50. else
  51. {
  52. try
  53. {
  54. autoDetachObserver.SetResource(SubscribeCore(autoDetachObserver));
  55. }
  56. catch (Exception exception)
  57. {
  58. //
  59. // This can happen when there's a synchronous callback to OnError in the
  60. // implementation of SubscribeCore, which also throws. So, we're seeing
  61. // an exception being thrown from a handler.
  62. //
  63. // For compat with v1.x, we rethrow the exception in this case, keeping
  64. // in mind this should be rare but if it happens, something's totally
  65. // screwed up.
  66. //
  67. if (!autoDetachObserver.Fail(exception))
  68. {
  69. throw;
  70. }
  71. }
  72. }
  73. return autoDetachObserver;
  74. }
  75. private void ScheduledSubscribe(AutoDetachObserver<T> autoDetachObserver)
  76. {
  77. try
  78. {
  79. autoDetachObserver.SetResource(SubscribeCore(autoDetachObserver));
  80. }
  81. catch (Exception exception)
  82. {
  83. //
  84. // This can happen when there's a synchronous callback to OnError in the
  85. // implementation of SubscribeCore, which also throws. So, we're seeing
  86. // an exception being thrown from a handler.
  87. //
  88. // For compat with v1.x, we rethrow the exception in this case, keeping
  89. // in mind this should be rare but if it happens, something's totally
  90. // screwed up.
  91. //
  92. if (!autoDetachObserver.Fail(exception))
  93. {
  94. throw;
  95. }
  96. }
  97. }
  98. /// <summary>
  99. /// Implement this method with the core subscription logic for the observable sequence.
  100. /// </summary>
  101. /// <param name="observer">Observer to send notifications to.</param>
  102. /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
  103. protected abstract IDisposable SubscribeCore(IObserver<T> observer);
  104. }
  105. }