// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.ComponentModel; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { /// /// Provides basic synchronization and scheduling services for observable sequences. /// [EditorBrowsable(EditorBrowsableState.Advanced)] public static class Synchronization { #region SubscribeOn /// /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. /// /// The type of the elements in the source sequence. /// Source sequence. /// Scheduler to perform subscription and unsubscription actions on. /// The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. /// or is null. /// /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler. /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use . /// public static IObservable SubscribeOn(IObservable source, IScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return new AnonymousObservable(observer => { var m = new SingleAssignmentDisposable(); var d = new SerialDisposable(); d.Disposable = m; m.Disposable = scheduler.Schedule(() => { d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer)); }); return d; }); } /// /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. /// /// The type of the elements in the source sequence. /// Source sequence. /// Synchronization context to perform subscription and unsubscription actions on. /// The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context. /// or is null. /// /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context. /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use . /// public static IObservable SubscribeOn(IObservable source, SynchronizationContext context) { if (source == null) throw new ArgumentNullException(nameof(source)); if (context == null) throw new ArgumentNullException(nameof(context)); return new AnonymousObservable(observer => { var subscription = new SingleAssignmentDisposable(); context.PostWithStartComplete(() => { if (!subscription.IsDisposed) { subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer)); } }); return subscription; }); } #endregion #region ObserveOn /// /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler. /// /// The type of the elements in the source sequence. /// Source sequence. /// Scheduler to notify observers on. /// The source sequence whose observations happen on the specified scheduler. /// or is null. public static IObservable ObserveOn(IObservable source, IScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); #if !NO_PERF return new ObserveOn(source, scheduler); #else return new AnonymousObservable(observer => source.Subscribe(new ObserveOnObserver(scheduler, observer, null))); #endif } /// /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context. /// /// The type of the elements in the source sequence. /// Source sequence. /// Synchronization context to notify observers on. /// The source sequence whose observations happen on the specified synchronization context. /// or is null. public static IObservable ObserveOn(IObservable source, SynchronizationContext context) { if (source == null) throw new ArgumentNullException(nameof(source)); if (context == null) throw new ArgumentNullException(nameof(context)); #if !NO_PERF return new ObserveOn(source, context); #else return new AnonymousObservable(observer => { context.OperationStarted(); return source.Subscribe( x => context.Post(_ => { observer.OnNext(x); }, null), exception => context.Post(_ => { observer.OnError(exception); }, null), () => context.Post(_ => { observer.OnCompleted(); }, null) ).Finally(() => { context.OperationCompleted(); }); }); #endif } #endregion #region Synchronize /// /// Wraps the source sequence in order to ensure observer callbacks are properly serialized. /// /// The type of the elements in the source sequence. /// Source sequence. /// The source sequence whose outgoing calls to observers are synchronized. /// is null. public static IObservable Synchronize(IObservable source) { if (source == null) throw new ArgumentNullException(nameof(source)); #if !NO_PERF return new Synchronize(source); #else return new AnonymousObservable(observer => { var gate = new object(); return source.Subscribe(Observer.Synchronize(observer, gate)); }); #endif } /// /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object. /// /// The type of the elements in the source sequence. /// Source sequence. /// Gate object to synchronize each observer call on. /// The source sequence whose outgoing calls to observers are synchronized on the given gate object. /// or is null. public static IObservable Synchronize(IObservable source, object gate) { if (source == null) throw new ArgumentNullException(nameof(source)); if (gate == null) throw new ArgumentNullException(nameof(gate)); #if !NO_PERF return new Synchronize(source, gate); #else return new AnonymousObservable(observer => { return source.Subscribe(Observer.Synchronize(observer, gate)); }); #endif } #endregion } }