// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
using System.ComponentModel;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
///
/// Provides basic synchronization and scheduling services for observable sequences.
///
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static class Synchronization
{
#region SubscribeOn
///
/// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Scheduler to perform subscription and unsubscription actions on.
/// The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
/// or is null.
///
/// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
/// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use .
///
public static IObservable SubscribeOn(IObservable source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
return new AnonymousObservable(observer =>
{
var m = new SingleAssignmentDisposable();
var d = new SerialDisposable();
d.Disposable = m;
m.Disposable = scheduler.Schedule(() =>
{
d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
});
return d;
});
}
///
/// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Synchronization context to perform subscription and unsubscription actions on.
/// The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.
/// or is null.
///
/// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context.
/// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use .
///
public static IObservable SubscribeOn(IObservable source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (context == null)
throw new ArgumentNullException(nameof(context));
return new AnonymousObservable(observer =>
{
var subscription = new SingleAssignmentDisposable();
context.PostWithStartComplete(() =>
{
if (!subscription.IsDisposed)
{
subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
}
});
return subscription;
});
}
#endregion
#region ObserveOn
///
/// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Scheduler to notify observers on.
/// The source sequence whose observations happen on the specified scheduler.
/// or is null.
public static IObservable ObserveOn(IObservable source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
#if !NO_PERF
return new ObserveOn(source, scheduler);
#else
return new AnonymousObservable(observer => source.Subscribe(new ObserveOnObserver(scheduler, observer, null)));
#endif
}
///
/// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Synchronization context to notify observers on.
/// The source sequence whose observations happen on the specified synchronization context.
/// or is null.
public static IObservable ObserveOn(IObservable source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (context == null)
throw new ArgumentNullException(nameof(context));
#if !NO_PERF
return new ObserveOn(source, context);
#else
return new AnonymousObservable(observer =>
{
context.OperationStarted();
return source.Subscribe(
x => context.Post(_ =>
{
observer.OnNext(x);
}, null),
exception => context.Post(_ =>
{
observer.OnError(exception);
}, null),
() => context.Post(_ =>
{
observer.OnCompleted();
}, null)
).Finally(() =>
{
context.OperationCompleted();
});
});
#endif
}
#endregion
#region Synchronize
///
/// Wraps the source sequence in order to ensure observer callbacks are properly serialized.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// The source sequence whose outgoing calls to observers are synchronized.
/// is null.
public static IObservable Synchronize(IObservable source)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
#if !NO_PERF
return new Synchronize(source);
#else
return new AnonymousObservable(observer =>
{
var gate = new object();
return source.Subscribe(Observer.Synchronize(observer, gate));
});
#endif
}
///
/// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Gate object to synchronize each observer call on.
/// The source sequence whose outgoing calls to observers are synchronized on the given gate object.
/// or is null.
public static IObservable Synchronize(IObservable source, object gate)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (gate == null)
throw new ArgumentNullException(nameof(gate));
#if !NO_PERF
return new Synchronize(source, gate);
#else
return new AnonymousObservable(observer =>
{
return source.Subscribe(Observer.Synchronize(observer, gate));
});
#endif
}
#endregion
}
}