| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 | 
							- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 
- using System.Reactive.Disposables;
 
- using System.Reactive.Linq;
 
- namespace System.Reactive.Subjects
 
- {
 
-     /// <summary>
 
-     /// Represents an observable wrapper that can be connected and disconnected from its underlying observable sequence.
 
-     /// </summary>
 
-     /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
 
-     /// <typeparam name="TResult">The type of the elements in the resulting sequence, after transformation through the subject.</typeparam>
 
-     internal class ConnectableObservable<TSource, TResult> : IConnectableObservable<TResult>
 
-     {
 
-         private readonly ISubject<TSource, TResult> _subject;
 
-         private readonly IObservable<TSource> _source;
 
-         private readonly object _gate;
 
-         private Connection _connection;
 
-         /// <summary>
 
-         /// Creates an observable that can be connected and disconnected from its source.
 
-         /// </summary>
 
-         /// <param name="source">Underlying observable source sequence that can be connected and disconnected from the wrapper.</param>
 
-         /// <param name="subject">Subject exposed by the connectable observable, receiving data from the underlying source sequence upon connection.</param>
 
-         public ConnectableObservable(IObservable<TSource> source, ISubject<TSource, TResult> subject)
 
-         {
 
-             _subject = subject;
 
-             _source = source.AsObservable(); // This gets us auto-detach behavior; otherwise, we'd have to roll our own, including trampoline installation.
 
-             _gate = new object();
 
-         }
 
-         /// <summary>
 
-         /// Connects the observable wrapper to its source. All subscribed observers will receive values from the underlying observable sequence as long as the connection is established.
 
-         /// </summary>
 
-         /// <returns>Disposable object used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.</returns>
 
-         public IDisposable Connect()
 
-         {
 
-             lock (_gate)
 
-             {
 
-                 if (_connection == null)
 
-                 {
 
-                     var subscription = _source.SubscribeSafe(_subject);
 
-                     _connection = new Connection(this, subscription);
 
-                 }
 
-                 return _connection;
 
-             }
 
-         }
 
-         class Connection : IDisposable
 
-         {
 
-             private readonly ConnectableObservable<TSource, TResult> _parent;
 
-             private IDisposable _subscription;
 
-             public Connection(ConnectableObservable<TSource, TResult> parent, IDisposable subscription)
 
-             {
 
-                 _parent = parent;
 
-                 _subscription = subscription;
 
-             }
 
-             public void Dispose()
 
-             {
 
-                 lock (_parent._gate)
 
-                 {
 
-                     if (_subscription != null)
 
-                     {
 
-                         _subscription.Dispose();
 
-                         _subscription = null;
 
-                         _parent._connection = null;
 
-                     }
 
-                 }
 
-             }
 
-         }
 
-         /// <summary>
 
-         /// Subscribes an observer to the observable sequence. No values from the underlying observable source will be received unless a connection was established through the Connect method.
 
-         /// </summary>
 
-         /// <param name="observer">Observer that will receive values from the underlying observable source when the current ConnectableObservable instance is connected through a call to Connect.</param>
 
-         /// <returns>Disposable used to unsubscribe from the observable sequence.</returns>
 
-         public IDisposable Subscribe(IObserver<TResult> observer)
 
-         {
 
-             if (observer == null)
 
-                 throw new ArgumentNullException("observer");
 
-             return _subject.SubscribeSafe(observer);
 
-         }
 
-     }
 
- }
 
 
  |