Subject.Extensions.cs 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Concurrency;
  3. using System.Reactive.Linq;
  4. namespace System.Reactive.Subjects
  5. {
  6. /// <summary>
  7. /// Provides a set of static methods for creating subjects.
  8. /// </summary>
  9. public static class Subject
  10. {
  11. /// <summary>
  12. /// Creates a subject from the specified observer and observable.
  13. /// </summary>
  14. /// <typeparam name="TSource">The type of the elements received by the observer.</typeparam>
  15. /// <typeparam name="TResult">The type of the elements produced by the observable sequence.</typeparam>
  16. /// <param name="observer">The observer used to send messages to the subject.</param>
  17. /// <param name="observable">The observable used to subscribe to messages sent from the subject.</param>
  18. /// <returns>Subject implemented using the given observer and observable.</returns>
  19. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="observable"/> is null.</exception>
  20. public static ISubject<TSource, TResult> Create<TSource, TResult>(IObserver<TSource> observer, IObservable<TResult> observable)
  21. {
  22. if (observer == null)
  23. throw new ArgumentNullException("observer");
  24. if (observable == null)
  25. throw new ArgumentNullException("observable");
  26. return new AnonymousSubject<TSource, TResult>(observer, observable);
  27. }
  28. /// <summary>
  29. /// Creates a subject from the specified observer and observable.
  30. /// </summary>
  31. /// <typeparam name="T">The type of the elements received by the observer and produced by the observable sequence.</typeparam>
  32. /// <param name="observer">The observer used to send messages to the subject.</param>
  33. /// <param name="observable">The observable used to subscribe to messages sent from the subject.</param>
  34. /// <returns>Subject implemented using the given observer and observable.</returns>
  35. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="observable"/> is null.</exception>
  36. public static ISubject<T> Create<T>(IObserver<T> observer, IObservable<T> observable)
  37. {
  38. if (observer == null)
  39. throw new ArgumentNullException("observer");
  40. if (observable == null)
  41. throw new ArgumentNullException("observable");
  42. return new AnonymousSubject<T>(observer, observable);
  43. }
  44. /// <summary>
  45. /// Synchronizes the messages sent to the subject.
  46. /// </summary>
  47. /// <typeparam name="TSource">The type of the elements received by the subject.</typeparam>
  48. /// <typeparam name="TResult">The type of the elements produced by the subject.</typeparam>
  49. /// <param name="subject">The subject to synchronize.</param>
  50. /// <returns>Subject whose messages are synchronized.</returns>
  51. /// <exception cref="ArgumentNullException"><paramref name="subject"/> is null.</exception>
  52. public static ISubject<TSource, TResult> Synchronize<TSource, TResult>(ISubject<TSource, TResult> subject)
  53. {
  54. if (subject == null)
  55. throw new ArgumentNullException("subject");
  56. return new AnonymousSubject<TSource, TResult>(Observer.Synchronize(subject), subject);
  57. }
  58. /// <summary>
  59. /// Synchronizes the messages sent to the subject.
  60. /// </summary>
  61. /// <typeparam name="TSource">The type of the elements received and produced by the subject.</typeparam>
  62. /// <param name="subject">The subject to synchronize.</param>
  63. /// <returns>Subject whose messages are synchronized.</returns>
  64. /// <exception cref="ArgumentNullException"><paramref name="subject"/> is null.</exception>
  65. public static ISubject<TSource> Synchronize<TSource>(ISubject<TSource> subject)
  66. {
  67. if (subject == null)
  68. throw new ArgumentNullException("subject");
  69. return new AnonymousSubject<TSource>(Observer.Synchronize(subject), subject);
  70. }
  71. /// <summary>
  72. /// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler.
  73. /// </summary>
  74. /// <typeparam name="TSource">The type of the elements received by the subject.</typeparam>
  75. /// <typeparam name="TResult">The type of the elements produced by the subject.</typeparam>
  76. /// <param name="subject">The subject to synchronize.</param>
  77. /// <param name="scheduler">Scheduler to notify observers on.</param>
  78. /// <returns>Subject whose messages are synchronized and whose observers are notified on the given scheduler.</returns>
  79. /// <exception cref="ArgumentNullException"><paramref name="subject"/> or <paramref name="scheduler"/> is null.</exception>
  80. public static ISubject<TSource, TResult> Synchronize<TSource, TResult>(ISubject<TSource, TResult> subject, IScheduler scheduler)
  81. {
  82. if (subject == null)
  83. throw new ArgumentNullException("subject");
  84. if (scheduler == null)
  85. throw new ArgumentNullException("scheduler");
  86. return new AnonymousSubject<TSource, TResult>(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
  87. }
  88. /// <summary>
  89. /// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler.
  90. /// </summary>
  91. /// <typeparam name="TSource">The type of the elements received and produced by the subject.</typeparam>
  92. /// <param name="subject">The subject to synchronize.</param>
  93. /// <param name="scheduler">Scheduler to notify observers on.</param>
  94. /// <returns>Subject whose messages are synchronized and whose observers are notified on the given scheduler.</returns>
  95. /// <exception cref="ArgumentNullException"><paramref name="subject"/> or <paramref name="scheduler"/> is null.</exception>
  96. public static ISubject<TSource> Synchronize<TSource>(ISubject<TSource> subject, IScheduler scheduler)
  97. {
  98. if (subject == null)
  99. throw new ArgumentNullException("subject");
  100. if (scheduler == null)
  101. throw new ArgumentNullException("scheduler");
  102. return new AnonymousSubject<TSource>(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
  103. }
  104. class AnonymousSubject<T, U> : ISubject<T, U>
  105. {
  106. private readonly IObserver<T> _observer;
  107. private readonly IObservable<U> _observable;
  108. public AnonymousSubject(IObserver<T> observer, IObservable<U> observable)
  109. {
  110. _observer = observer;
  111. _observable = observable;
  112. }
  113. public void OnCompleted()
  114. {
  115. _observer.OnCompleted();
  116. }
  117. public void OnError(Exception error)
  118. {
  119. if (error == null)
  120. throw new ArgumentNullException("error");
  121. _observer.OnError(error);
  122. }
  123. public void OnNext(T value)
  124. {
  125. _observer.OnNext(value);
  126. }
  127. public IDisposable Subscribe(IObserver<U> observer)
  128. {
  129. if (observer == null)
  130. throw new ArgumentNullException("observer");
  131. //
  132. // [OK] Use of unsafe Subscribe: non-pretentious wrapping of an observable sequence.
  133. //
  134. return _observable.Subscribe/*Unsafe*/(observer);
  135. }
  136. }
  137. class AnonymousSubject<T> : AnonymousSubject<T, T>, ISubject<T>
  138. {
  139. public AnonymousSubject(IObserver<T> observer, IObservable<T> observable)
  140. : base(observer, observable)
  141. {
  142. }
  143. }
  144. }
  145. }