// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive { /// /// Interface with variance annotation; allows for better type checking when detecting capabilities in SubscribeSafe. /// /// Type of the resulting sequence's elements. internal interface IProducer : IObservable { IDisposable SubscribeRaw(IObserver observer, bool enableSafeguard); } /// /// Base class for implementation of query operators, providing performance benefits over the use of Observable.Create. /// /// Type of the resulting sequence's elements. internal abstract class BasicProducer : IProducer { /// /// Publicly visible Subscribe method. /// /// Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer. /// IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer. public IDisposable Subscribe(IObserver observer) { if (observer == null) { throw new ArgumentNullException(nameof(observer)); } return SubscribeRaw(observer, enableSafeguard: true); } public IDisposable SubscribeRaw(IObserver observer, bool enableSafeguard) { IDisposable run; ISafeObserver? safeObserver = null; // // See AutoDetachObserver.cs for more information on the safeguarding requirement and // its implementation aspects. // if (enableSafeguard) { observer = safeObserver = SafeObserver.Wrap(observer); } if (CurrentThreadScheduler.IsScheduleRequired) { var runAssignable = new SingleAssignmentDisposable(); CurrentThreadScheduler.Instance.ScheduleAction( (@this: this, runAssignable, observer), static tuple => tuple.runAssignable.Disposable = tuple.@this.Run(tuple.observer)); run = runAssignable; } else { run = Run(observer); } safeObserver?.SetResource(run); return run; } /// /// Core implementation of the query operator, called upon a new subscription to the producer object. /// /// Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer. /// Disposable representing all the resources and/or subscriptions the operator uses to process events. /// The observer passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar. protected abstract IDisposable Run(IObserver observer); } internal abstract class Producer : IProducer where TSink : IDisposable { /// /// Publicly visible Subscribe method. /// /// Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer. /// IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer. public IDisposable Subscribe(IObserver observer) { if (observer == null) { throw new ArgumentNullException(nameof(observer)); } return SubscribeRaw(observer, enableSafeguard: true); } public IDisposable SubscribeRaw(IObserver observer, bool enableSafeguard) { ISafeObserver? safeObserver = null; // // See AutoDetachObserver.cs for more information on the safeguarding requirement and // its implementation aspects. // if (enableSafeguard) { observer = safeObserver = SafeObserver.Wrap(observer); } var sink = CreateSink(observer); safeObserver?.SetResource(sink); if (CurrentThreadScheduler.IsScheduleRequired) { CurrentThreadScheduler.Instance.ScheduleAction( (@this: this, sink), static tuple => tuple.@this.Run(tuple.sink)); } else { Run(sink); } return sink; } /// /// Core implementation of the query operator, called upon a new subscription to the producer object. /// /// The sink object. protected abstract void Run(TSink sink); protected abstract TSink CreateSink(IObserver observer); } }