Producer.cs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System.Reactive.Concurrency;
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive
  6. {
  7. /// <summary>
  8. /// Interface with variance annotation; allows for better type checking when detecting capabilities in SubscribeSafe.
  9. /// </summary>
  10. /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam>
  11. internal interface IProducer<
  12. #if !NO_VARIANCE
  13. out
  14. #endif
  15. TSource> : IObservable<TSource>
  16. {
  17. IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard);
  18. }
  19. /// <summary>
  20. /// Base class for implementation of query operators, providing performance benefits over the use of Observable.Create.
  21. /// </summary>
  22. /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam>
  23. internal abstract class Producer<TSource> : IProducer<TSource>
  24. {
  25. /// <summary>
  26. /// Publicly visible Subscribe method.
  27. /// </summary>
  28. /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
  29. /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns>
  30. public IDisposable Subscribe(IObserver<TSource> observer)
  31. {
  32. if (observer == null)
  33. throw new ArgumentNullException("observer");
  34. return SubscribeRaw(observer, true);
  35. }
  36. public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard)
  37. {
  38. var state = new State();
  39. state.observer = observer;
  40. state.sink = new SingleAssignmentDisposable();
  41. state.subscription = new SingleAssignmentDisposable();
  42. var d = StableCompositeDisposable.Create(state.sink, state.subscription);
  43. //
  44. // See AutoDetachObserver.cs for more information on the safeguarding requirement and
  45. // its implementation aspects.
  46. //
  47. if (enableSafeguard)
  48. {
  49. state.observer = SafeObserver<TSource>.Create(state.observer, d);
  50. }
  51. if (CurrentThreadScheduler.IsScheduleRequired)
  52. {
  53. CurrentThreadScheduler.Instance.Schedule(state, Run);
  54. }
  55. else
  56. {
  57. state.subscription.Disposable = this.Run(state.observer, state.subscription, state.Assign);
  58. }
  59. return d;
  60. }
  61. struct State
  62. {
  63. public SingleAssignmentDisposable sink;
  64. public SingleAssignmentDisposable subscription;
  65. public IObserver<TSource> observer;
  66. public void Assign(IDisposable s)
  67. {
  68. sink.Disposable = s;
  69. }
  70. }
  71. private IDisposable Run(IScheduler _, State x)
  72. {
  73. x.subscription.Disposable = this.Run(x.observer, x.subscription, x.Assign);
  74. return Disposable.Empty;
  75. }
  76. /// <summary>
  77. /// Core implementation of the query operator, called upon a new subscription to the producer object.
  78. /// </summary>
  79. /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
  80. /// <param name="cancel">The subscription disposable object returned from the Run call, passed in such that it can be forwarded to the sink, allowing it to dispose the subscription upon sending a final message (or prematurely for other reasons).</param>
  81. /// <param name="setSink">Callback to communicate the sink object to the subscriber, allowing consumers to tunnel a Dispose call into the sink, which can stop the processing.</param>
  82. /// <returns>Disposable representing all the resources and/or subscriptions the operator uses to process events.</returns>
  83. /// <remarks>The <paramref name="observer">observer</paramref> passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar.</remarks>
  84. protected abstract IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink);
  85. }
  86. }
  87. #endif