Subject.Extensions.cs 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Linq;
  6. namespace System.Reactive.Subjects
  7. {
  8. /// <summary>
  9. /// Provides a set of static methods for creating subjects.
  10. /// </summary>
  11. public static class Subject
  12. {
  13. /// <summary>
  14. /// Creates a subject from the specified observer and observable.
  15. /// </summary>
  16. /// <typeparam name="TSource">The type of the elements received by the observer.</typeparam>
  17. /// <typeparam name="TResult">The type of the elements produced by the observable sequence.</typeparam>
  18. /// <param name="observer">The observer used to send messages to the subject.</param>
  19. /// <param name="observable">The observable used to subscribe to messages sent from the subject.</param>
  20. /// <returns>Subject implemented using the given observer and observable.</returns>
  21. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="observable"/> is <c>null</c>.</exception>
  22. public static ISubject<TSource, TResult> Create<TSource, TResult>(IObserver<TSource> observer, IObservable<TResult> observable)
  23. {
  24. if (observer == null)
  25. throw new ArgumentNullException(nameof(observer));
  26. if (observable == null)
  27. throw new ArgumentNullException(nameof(observable));
  28. return new AnonymousSubject<TSource, TResult>(observer, observable);
  29. }
  30. /// <summary>
  31. /// Creates a subject from the specified observer and observable.
  32. /// </summary>
  33. /// <typeparam name="T">The type of the elements received by the observer and produced by the observable sequence.</typeparam>
  34. /// <param name="observer">The observer used to send messages to the subject.</param>
  35. /// <param name="observable">The observable used to subscribe to messages sent from the subject.</param>
  36. /// <returns>Subject implemented using the given observer and observable.</returns>
  37. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="observable"/> is <c>null</c>.</exception>
  38. public static ISubject<T> Create<T>(IObserver<T> observer, IObservable<T> observable)
  39. {
  40. if (observer == null)
  41. throw new ArgumentNullException(nameof(observer));
  42. if (observable == null)
  43. throw new ArgumentNullException(nameof(observable));
  44. return new AnonymousSubject<T>(observer, observable);
  45. }
  46. /// <summary>
  47. /// Synchronizes the messages sent to the subject.
  48. /// </summary>
  49. /// <typeparam name="TSource">The type of the elements received by the subject.</typeparam>
  50. /// <typeparam name="TResult">The type of the elements produced by the subject.</typeparam>
  51. /// <param name="subject">The subject to synchronize.</param>
  52. /// <returns>Subject whose messages are synchronized.</returns>
  53. /// <exception cref="ArgumentNullException"><paramref name="subject"/> is <c>null</c>.</exception>
  54. public static ISubject<TSource, TResult> Synchronize<TSource, TResult>(ISubject<TSource, TResult> subject)
  55. {
  56. if (subject == null)
  57. throw new ArgumentNullException(nameof(subject));
  58. return new AnonymousSubject<TSource, TResult>(Observer.Synchronize(subject), subject);
  59. }
  60. /// <summary>
  61. /// Synchronizes the messages sent to the subject.
  62. /// </summary>
  63. /// <typeparam name="TSource">The type of the elements received and produced by the subject.</typeparam>
  64. /// <param name="subject">The subject to synchronize.</param>
  65. /// <returns>Subject whose messages are synchronized.</returns>
  66. /// <exception cref="ArgumentNullException"><paramref name="subject"/> is <c>null</c>.</exception>
  67. public static ISubject<TSource> Synchronize<TSource>(ISubject<TSource> subject)
  68. {
  69. if (subject == null)
  70. throw new ArgumentNullException(nameof(subject));
  71. return new AnonymousSubject<TSource>(Observer.Synchronize(subject), subject);
  72. }
  73. /// <summary>
  74. /// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler.
  75. /// </summary>
  76. /// <typeparam name="TSource">The type of the elements received by the subject.</typeparam>
  77. /// <typeparam name="TResult">The type of the elements produced by the subject.</typeparam>
  78. /// <param name="subject">The subject to synchronize.</param>
  79. /// <param name="scheduler">Scheduler to notify observers on.</param>
  80. /// <returns>Subject whose messages are synchronized and whose observers are notified on the given scheduler.</returns>
  81. /// <exception cref="ArgumentNullException"><paramref name="subject"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  82. public static ISubject<TSource, TResult> Synchronize<TSource, TResult>(ISubject<TSource, TResult> subject, IScheduler scheduler)
  83. {
  84. if (subject == null)
  85. throw new ArgumentNullException(nameof(subject));
  86. if (scheduler == null)
  87. throw new ArgumentNullException(nameof(scheduler));
  88. return new AnonymousSubject<TSource, TResult>(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
  89. }
  90. /// <summary>
  91. /// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler.
  92. /// </summary>
  93. /// <typeparam name="TSource">The type of the elements received and produced by the subject.</typeparam>
  94. /// <param name="subject">The subject to synchronize.</param>
  95. /// <param name="scheduler">Scheduler to notify observers on.</param>
  96. /// <returns>Subject whose messages are synchronized and whose observers are notified on the given scheduler.</returns>
  97. /// <exception cref="ArgumentNullException"><paramref name="subject"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  98. public static ISubject<TSource> Synchronize<TSource>(ISubject<TSource> subject, IScheduler scheduler)
  99. {
  100. if (subject == null)
  101. throw new ArgumentNullException(nameof(subject));
  102. if (scheduler == null)
  103. throw new ArgumentNullException(nameof(scheduler));
  104. return new AnonymousSubject<TSource>(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
  105. }
  106. private class AnonymousSubject<T, U> : ISubject<T, U>
  107. {
  108. private readonly IObserver<T> _observer;
  109. private readonly IObservable<U> _observable;
  110. public AnonymousSubject(IObserver<T> observer, IObservable<U> observable)
  111. {
  112. _observer = observer;
  113. _observable = observable;
  114. }
  115. public void OnCompleted() => _observer.OnCompleted();
  116. public void OnError(Exception error)
  117. {
  118. if (error == null)
  119. throw new ArgumentNullException(nameof(error));
  120. _observer.OnError(error);
  121. }
  122. public void OnNext(T value) => _observer.OnNext(value);
  123. public IDisposable Subscribe(IObserver<U> observer)
  124. {
  125. if (observer == null)
  126. throw new ArgumentNullException(nameof(observer));
  127. //
  128. // [OK] Use of unsafe Subscribe: non-pretentious wrapping of an observable sequence.
  129. //
  130. return _observable.Subscribe/*Unsafe*/(observer);
  131. }
  132. }
  133. private sealed class AnonymousSubject<T> : AnonymousSubject<T, T>, ISubject<T>
  134. {
  135. public AnonymousSubject(IObserver<T> observer, IObservable<T> observable)
  136. : base(observer, observable)
  137. {
  138. }
  139. }
  140. }
  141. }