ConnectableObservable.cs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Disposables;
  3. using System.Reactive.Linq;
  4. namespace System.Reactive.Subjects
  5. {
  6. /// <summary>
  7. /// Represents an observable wrapper that can be connected and disconnected from its underlying observable sequence.
  8. /// </summary>
  9. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  10. /// <typeparam name="TResult">The type of the elements in the resulting sequence, after transformation through the subject.</typeparam>
  11. internal class ConnectableObservable<TSource, TResult> : IConnectableObservable<TResult>
  12. {
  13. private readonly ISubject<TSource, TResult> _subject;
  14. private readonly IObservable<TSource> _source;
  15. private readonly object _gate;
  16. private Connection _connection;
  17. /// <summary>
  18. /// Creates an observable that can be connected and disconnected from its source.
  19. /// </summary>
  20. /// <param name="source">Underlying observable source sequence that can be connected and disconnected from the wrapper.</param>
  21. /// <param name="subject">Subject exposed by the connectable observable, receiving data from the underlying source sequence upon connection.</param>
  22. public ConnectableObservable(IObservable<TSource> source, ISubject<TSource, TResult> subject)
  23. {
  24. _subject = subject;
  25. _source = source.AsObservable(); // This gets us auto-detach behavior; otherwise, we'd have to roll our own, including trampoline installation.
  26. _gate = new object();
  27. }
  28. /// <summary>
  29. /// Connects the observable wrapper to its source. All subscribed observers will receive values from the underlying observable sequence as long as the connection is established.
  30. /// </summary>
  31. /// <returns>Disposable object used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.</returns>
  32. public IDisposable Connect()
  33. {
  34. lock (_gate)
  35. {
  36. if (_connection == null)
  37. {
  38. var subscription = _source.SubscribeSafe(_subject);
  39. _connection = new Connection(this, subscription);
  40. }
  41. return _connection;
  42. }
  43. }
  44. class Connection : IDisposable
  45. {
  46. private readonly ConnectableObservable<TSource, TResult> _parent;
  47. private IDisposable _subscription;
  48. public Connection(ConnectableObservable<TSource, TResult> parent, IDisposable subscription)
  49. {
  50. _parent = parent;
  51. _subscription = subscription;
  52. }
  53. public void Dispose()
  54. {
  55. lock (_parent._gate)
  56. {
  57. if (_subscription != null)
  58. {
  59. _subscription.Dispose();
  60. _subscription = null;
  61. _parent._connection = null;
  62. }
  63. }
  64. }
  65. }
  66. /// <summary>
  67. /// Subscribes an observer to the observable sequence. No values from the underlying observable source will be received unless a connection was established through the Connect method.
  68. /// </summary>
  69. /// <param name="observer">Observer that will receive values from the underlying observable source when the current ConnectableObservable instance is connected through a call to Connect.</param>
  70. /// <returns>Disposable used to unsubscribe from the observable sequence.</returns>
  71. public IDisposable Subscribe(IObserver<TResult> observer)
  72. {
  73. if (observer == null)
  74. throw new ArgumentNullException("observer");
  75. return _subject.SubscribeSafe(observer);
  76. }
  77. }
  78. }