| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- using System.Reactive.Disposables;
- using System.Reactive.Linq;
- namespace System.Reactive.Subjects
- {
- /// <summary>
- /// Represents an observable wrapper that can be connected and disconnected from its underlying observable sequence.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <typeparam name="TResult">The type of the elements in the resulting sequence, after transformation through the subject.</typeparam>
- internal class ConnectableObservable<TSource, TResult> : IConnectableObservable<TResult>
- {
- private readonly ISubject<TSource, TResult> _subject;
- private readonly IObservable<TSource> _source;
- private readonly object _gate;
- private Connection _connection;
- /// <summary>
- /// Creates an observable that can be connected and disconnected from its source.
- /// </summary>
- /// <param name="source">Underlying observable source sequence that can be connected and disconnected from the wrapper.</param>
- /// <param name="subject">Subject exposed by the connectable observable, receiving data from the underlying source sequence upon connection.</param>
- public ConnectableObservable(IObservable<TSource> source, ISubject<TSource, TResult> subject)
- {
- _subject = subject;
- _source = source.AsObservable(); // This gets us auto-detach behavior; otherwise, we'd have to roll our own, including trampoline installation.
- _gate = new object();
- }
- /// <summary>
- /// Connects the observable wrapper to its source. All subscribed observers will receive values from the underlying observable sequence as long as the connection is established.
- /// </summary>
- /// <returns>Disposable object used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.</returns>
- public IDisposable Connect()
- {
- lock (_gate)
- {
- if (_connection == null)
- {
- var subscription = _source.SubscribeSafe(_subject);
- _connection = new Connection(this, subscription);
- }
- return _connection;
- }
- }
- class Connection : IDisposable
- {
- private readonly ConnectableObservable<TSource, TResult> _parent;
- private IDisposable _subscription;
- public Connection(ConnectableObservable<TSource, TResult> parent, IDisposable subscription)
- {
- _parent = parent;
- _subscription = subscription;
- }
- public void Dispose()
- {
- lock (_parent._gate)
- {
- if (_subscription != null)
- {
- _subscription.Dispose();
- _subscription = null;
- _parent._connection = null;
- }
- }
- }
- }
- /// <summary>
- /// Subscribes an observer to the observable sequence. No values from the underlying observable source will be received unless a connection was established through the Connect method.
- /// </summary>
- /// <param name="observer">Observer that will receive values from the underlying observable source when the current ConnectableObservable instance is connected through a call to Connect.</param>
- /// <returns>Disposable used to unsubscribe from the observable sequence.</returns>
- public IDisposable Subscribe(IObserver<TResult> observer)
- {
- if (observer == null)
- throw new ArgumentNullException("observer");
- return _subject.SubscribeSafe(observer);
- }
- }
- }
|