// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. #nullable disable using System.ComponentModel; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { /// /// Provides basic synchronization and scheduling services for observable sequences. /// [EditorBrowsable(EditorBrowsableState.Advanced)] public static class Synchronization { #region SubscribeOn /// /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. /// /// The type of the elements in the source sequence. /// Source sequence. /// Scheduler to perform subscription and unsubscription actions on. /// The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. /// or is null. /// /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler. /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use . /// public static IObservable SubscribeOn(IObservable source, IScheduler scheduler) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } return new SubscribeOnObservable(source, scheduler); } private sealed class SubscribeOnObservable : ObservableBase { private sealed class Subscription : IDisposable { private IDisposable _cancel; public Subscription(IObservable source, IScheduler scheduler, IObserver observer) { Disposable.TrySetSingle( ref _cancel, scheduler.Schedule( (@this: this, source, observer), (closureScheduler, state) => { Disposable.TrySetSerial(ref state.@this._cancel, new ScheduledDisposable(closureScheduler, state.source.SubscribeSafe(state.observer))); return Disposable.Empty; })); } public void Dispose() { Disposable.TryDispose(ref _cancel); } } private readonly IObservable _source; private readonly IScheduler _scheduler; public SubscribeOnObservable(IObservable source, IScheduler scheduler) { _source = source; _scheduler = scheduler; } protected override IDisposable SubscribeCore(IObserver observer) { return new Subscription(_source, _scheduler, observer); } } /// /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. /// /// The type of the elements in the source sequence. /// Source sequence. /// Synchronization context to perform subscription and unsubscription actions on. /// The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context. /// or is null. /// /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context. /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use . /// public static IObservable SubscribeOn(IObservable source, SynchronizationContext context) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (context == null) { throw new ArgumentNullException(nameof(context)); } return new SubscribeOnCtxObservable(source, context); } private sealed class SubscribeOnCtxObservable : ObservableBase { private sealed class Subscription : IDisposable { private readonly IObservable _source; private readonly IObserver _observer; private readonly SynchronizationContext _context; private IDisposable _cancel; public Subscription(IObservable source, SynchronizationContext context, IObserver observer) { _source = source; _context = context; _observer = observer; context.PostWithStartComplete( @this => { if (!Disposable.GetIsDisposed(ref @this._cancel)) { Disposable.SetSingle(ref @this._cancel, new ContextDisposable(@this._context, @this._source.SubscribeSafe(@this._observer))); } }, this); } public void Dispose() { Disposable.TryDispose(ref _cancel); } } private readonly IObservable _source; private readonly SynchronizationContext _context; public SubscribeOnCtxObservable(IObservable source, SynchronizationContext context) { _source = source; _context = context; } protected override IDisposable SubscribeCore(IObserver observer) { return new Subscription(_source, _context, observer); } } #endregion #region ObserveOn /// /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler. /// /// The type of the elements in the source sequence. /// Source sequence. /// Scheduler to notify observers on. /// The source sequence whose observations happen on the specified scheduler. /// or is null. public static IObservable ObserveOn(IObservable source, IScheduler scheduler) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } var longRunning = scheduler.AsLongRunning(); if (longRunning != null) { return new ObserveOn.SchedulerLongRunning(source, longRunning); } return new ObserveOn.Scheduler(source, scheduler); } /// /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context. /// /// The type of the elements in the source sequence. /// Source sequence. /// Synchronization context to notify observers on. /// The source sequence whose observations happen on the specified synchronization context. /// or is null. public static IObservable ObserveOn(IObservable source, SynchronizationContext context) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (context == null) { throw new ArgumentNullException(nameof(context)); } return new ObserveOn.Context(source, context); } #endregion #region Synchronize /// /// Wraps the source sequence in order to ensure observer callbacks are properly serialized. /// /// The type of the elements in the source sequence. /// Source sequence. /// The source sequence whose outgoing calls to observers are synchronized. /// is null. public static IObservable Synchronize(IObservable source) { if (source == null) { throw new ArgumentNullException(nameof(source)); } return new Synchronize(source); } /// /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object. /// /// The type of the elements in the source sequence. /// Source sequence. /// Gate object to synchronize each observer call on. /// The source sequence whose outgoing calls to observers are synchronized on the given gate object. /// or is null. public static IObservable Synchronize(IObservable source, object gate) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (gate == null) { throw new ArgumentNullException(nameof(gate)); } return new Synchronize(source, gate); } #endregion } }