Producer.cs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive
  7. {
  8. /// <summary>
  9. /// Interface with variance annotation; allows for better type checking when detecting capabilities in SubscribeSafe.
  10. /// </summary>
  11. /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam>
  12. internal interface IProducer<out TSource> : IObservable<TSource>
  13. {
  14. IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard);
  15. }
  16. /// <summary>
  17. /// Base class for implementation of query operators, providing performance benefits over the use of Observable.Create.
  18. /// </summary>
  19. /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam>
  20. internal abstract class BasicProducer<TSource> : IProducer<TSource>
  21. {
  22. /// <summary>
  23. /// Publicly visible Subscribe method.
  24. /// </summary>
  25. /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
  26. /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns>
  27. public IDisposable Subscribe(IObserver<TSource> observer)
  28. {
  29. if (observer == null)
  30. {
  31. throw new ArgumentNullException(nameof(observer));
  32. }
  33. return SubscribeRaw(observer, enableSafeguard: true);
  34. }
  35. public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard)
  36. {
  37. IDisposable run;
  38. ISafeObserver<TSource>? safeObserver = null;
  39. //
  40. // See AutoDetachObserver.cs for more information on the safeguarding requirement and
  41. // its implementation aspects.
  42. //
  43. if (enableSafeguard)
  44. {
  45. observer = safeObserver = SafeObserver<TSource>.Wrap(observer);
  46. }
  47. if (CurrentThreadScheduler.IsScheduleRequired)
  48. {
  49. var runAssignable = new SingleAssignmentDisposable();
  50. CurrentThreadScheduler.Instance.ScheduleAction(
  51. (@this: this, runAssignable, observer),
  52. static tuple => tuple.runAssignable.Disposable = [email protected](tuple.observer));
  53. run = runAssignable;
  54. }
  55. else
  56. {
  57. run = Run(observer);
  58. }
  59. safeObserver?.SetResource(run);
  60. return run;
  61. }
  62. /// <summary>
  63. /// Core implementation of the query operator, called upon a new subscription to the producer object.
  64. /// </summary>
  65. /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
  66. /// <returns>Disposable representing all the resources and/or subscriptions the operator uses to process events.</returns>
  67. /// <remarks>The <paramref name="observer">observer</paramref> passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar.</remarks>
  68. protected abstract IDisposable Run(IObserver<TSource> observer);
  69. }
  70. internal abstract class Producer<TTarget, TSink> : IProducer<TTarget>
  71. where TSink : IDisposable
  72. {
  73. /// <summary>
  74. /// Publicly visible Subscribe method.
  75. /// </summary>
  76. /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
  77. /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns>
  78. public IDisposable Subscribe(IObserver<TTarget> observer)
  79. {
  80. if (observer == null)
  81. {
  82. throw new ArgumentNullException(nameof(observer));
  83. }
  84. return SubscribeRaw(observer, enableSafeguard: true);
  85. }
  86. public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguard)
  87. {
  88. ISafeObserver<TTarget>? safeObserver = null;
  89. //
  90. // See AutoDetachObserver.cs for more information on the safeguarding requirement and
  91. // its implementation aspects.
  92. //
  93. if (enableSafeguard)
  94. {
  95. observer = safeObserver = SafeObserver<TTarget>.Wrap(observer);
  96. }
  97. var sink = CreateSink(observer);
  98. safeObserver?.SetResource(sink);
  99. if (CurrentThreadScheduler.IsScheduleRequired)
  100. {
  101. CurrentThreadScheduler.Instance.ScheduleAction(
  102. (@this: this, sink),
  103. static tuple => [email protected](tuple.sink));
  104. }
  105. else
  106. {
  107. Run(sink);
  108. }
  109. return sink;
  110. }
  111. /// <summary>
  112. /// Core implementation of the query operator, called upon a new subscription to the producer object.
  113. /// </summary>
  114. /// <param name="sink">The sink object.</param>
  115. protected abstract void Run(TSink sink);
  116. protected abstract TSink CreateSink(IObserver<TTarget> observer);
  117. }
  118. }