// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
#nullable disable
using System.ComponentModel;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
///
/// Provides basic synchronization and scheduling services for observable sequences.
///
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static class Synchronization
{
#region SubscribeOn
///
/// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Scheduler to perform subscription and unsubscription actions on.
/// The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
/// or is null.
///
/// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
/// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use .
///
public static IObservable SubscribeOn(IObservable source, IScheduler scheduler)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (scheduler == null)
{
throw new ArgumentNullException(nameof(scheduler));
}
return new SubscribeOnObservable(source, scheduler);
}
private sealed class SubscribeOnObservable : ObservableBase
{
private sealed class Subscription : IDisposable
{
private IDisposable _cancel;
public Subscription(IObservable source, IScheduler scheduler, IObserver observer)
{
Disposable.TrySetSingle(
ref _cancel,
scheduler.Schedule(
(@this: this, source, observer),
(closureScheduler, state) =>
{
Disposable.TrySetSerial(ref state.@this._cancel, new ScheduledDisposable(closureScheduler, state.source.SubscribeSafe(state.observer)));
return Disposable.Empty;
}));
}
public void Dispose()
{
Disposable.TryDispose(ref _cancel);
}
}
private readonly IObservable _source;
private readonly IScheduler _scheduler;
public SubscribeOnObservable(IObservable source, IScheduler scheduler)
{
_source = source;
_scheduler = scheduler;
}
protected override IDisposable SubscribeCore(IObserver observer)
{
return new Subscription(_source, _scheduler, observer);
}
}
///
/// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Synchronization context to perform subscription and unsubscription actions on.
/// The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.
/// or is null.
///
/// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context.
/// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use .
///
public static IObservable SubscribeOn(IObservable source, SynchronizationContext context)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
return new SubscribeOnCtxObservable(source, context);
}
private sealed class SubscribeOnCtxObservable : ObservableBase
{
private sealed class Subscription : IDisposable
{
private readonly IObservable _source;
private readonly IObserver _observer;
private readonly SynchronizationContext _context;
private IDisposable _cancel;
public Subscription(IObservable source, SynchronizationContext context, IObserver observer)
{
_source = source;
_context = context;
_observer = observer;
context.PostWithStartComplete(
@this =>
{
if (!Disposable.GetIsDisposed(ref @this._cancel))
{
Disposable.SetSingle(ref @this._cancel, new ContextDisposable(@this._context, @this._source.SubscribeSafe(@this._observer)));
}
},
this);
}
public void Dispose()
{
Disposable.TryDispose(ref _cancel);
}
}
private readonly IObservable _source;
private readonly SynchronizationContext _context;
public SubscribeOnCtxObservable(IObservable source, SynchronizationContext context)
{
_source = source;
_context = context;
}
protected override IDisposable SubscribeCore(IObserver observer)
{
return new Subscription(_source, _context, observer);
}
}
#endregion
#region ObserveOn
///
/// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Scheduler to notify observers on.
/// The source sequence whose observations happen on the specified scheduler.
/// or is null.
public static IObservable ObserveOn(IObservable source, IScheduler scheduler)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (scheduler == null)
{
throw new ArgumentNullException(nameof(scheduler));
}
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
{
return new ObserveOn.SchedulerLongRunning(source, longRunning);
}
return new ObserveOn.Scheduler(source, scheduler);
}
///
/// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Synchronization context to notify observers on.
/// The source sequence whose observations happen on the specified synchronization context.
/// or is null.
public static IObservable ObserveOn(IObservable source, SynchronizationContext context)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
return new ObserveOn.Context(source, context);
}
#endregion
#region Synchronize
///
/// Wraps the source sequence in order to ensure observer callbacks are properly serialized.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// The source sequence whose outgoing calls to observers are synchronized.
/// is null.
public static IObservable Synchronize(IObservable source)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
return new Synchronize(source);
}
///
/// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Gate object to synchronize each observer call on.
/// The source sequence whose outgoing calls to observers are synchronized on the given gate object.
/// or is null.
public static IObservable Synchronize(IObservable source, object gate)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (gate == null)
{
throw new ArgumentNullException(nameof(gate));
}
return new Synchronize(source, gate);
}
#endregion
}
}