1
0

Subject.Extensions.cs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Concurrency;
  3. using System.Reactive.Linq;
  4. namespace System.Reactive.Subjects
  5. {
  6. /// <summary>
  7. /// Provides a set of static methods for creating subjects.
  8. /// </summary>
  9. public static class Subject
  10. {
  11. /// <summary>
  12. /// Creates a subject from the specified observer and observable.
  13. /// </summary>
  14. /// <typeparam name="TSource">The type of the elements received by the observer.</typeparam>
  15. /// <typeparam name="TResult">The type of the elements produced by the observable sequence.</typeparam>
  16. /// <param name="observer">The observer used to send messages to the subject.</param>
  17. /// <param name="observable">The observable used to subscribe to messages sent from the subject.</param>
  18. /// <returns>Subject implemented using the given observer and observable.</returns>
  19. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="observable"/> is null.</exception>
  20. public static ISubject<TSource, TResult> Create<TSource, TResult>(IObserver<TSource> observer, IObservable<TResult> observable)
  21. {
  22. if (observer == null)
  23. throw new ArgumentNullException("observer");
  24. if (observable == null)
  25. throw new ArgumentNullException("observable");
  26. return new AnonymousSubject<TSource, TResult>(observer, observable);
  27. }
  28. /// <summary>
  29. /// Synchronizes the messages sent to the subject.
  30. /// </summary>
  31. /// <typeparam name="TSource">The type of the elements received by the subject.</typeparam>
  32. /// <typeparam name="TResult">The type of the elements produced by the subject.</typeparam>
  33. /// <param name="subject">The subject to synchronize.</param>
  34. /// <returns>Subject whose messages are synchronized.</returns>
  35. /// <exception cref="ArgumentNullException"><paramref name="subject"/> is null.</exception>
  36. public static ISubject<TSource, TResult> Synchronize<TSource, TResult>(ISubject<TSource, TResult> subject)
  37. {
  38. if (subject == null)
  39. throw new ArgumentNullException("subject");
  40. return new AnonymousSubject<TSource, TResult>(Observer.Synchronize(subject), subject);
  41. }
  42. /// <summary>
  43. /// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler.
  44. /// </summary>
  45. /// <typeparam name="TSource">The type of the elements received by the subject.</typeparam>
  46. /// <typeparam name="TResult">The type of the elements produced by the subject.</typeparam>
  47. /// <param name="subject">The subject to synchronize.</param>
  48. /// <param name="scheduler">Scheduler to notify observers on.</param>
  49. /// <returns>Subject whose messages are synchronized and whose observers are notified on the given scheduler.</returns>
  50. /// <exception cref="ArgumentNullException"><paramref name="subject"/> or <paramref name="scheduler"/> is null.</exception>
  51. public static ISubject<TSource, TResult> Synchronize<TSource, TResult>(ISubject<TSource, TResult> subject, IScheduler scheduler)
  52. {
  53. if (subject == null)
  54. throw new ArgumentNullException("subject");
  55. if (scheduler == null)
  56. throw new ArgumentNullException("scheduler");
  57. return new AnonymousSubject<TSource, TResult>(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
  58. }
  59. class AnonymousSubject<T, U> : ISubject<T, U>
  60. {
  61. private readonly IObserver<T> _observer;
  62. private readonly IObservable<U> _observable;
  63. public AnonymousSubject(IObserver<T> observer, IObservable<U> observable)
  64. {
  65. _observer = observer;
  66. _observable = observable;
  67. }
  68. public void OnCompleted()
  69. {
  70. _observer.OnCompleted();
  71. }
  72. public void OnError(Exception error)
  73. {
  74. if (error == null)
  75. throw new ArgumentNullException("error");
  76. _observer.OnError(error);
  77. }
  78. public void OnNext(T value)
  79. {
  80. _observer.OnNext(value);
  81. }
  82. public IDisposable Subscribe(IObserver<U> observer)
  83. {
  84. if (observer == null)
  85. throw new ArgumentNullException("observer");
  86. //
  87. // [OK] Use of unsafe Subscribe: non-pretentious wrapping of an observable sequence.
  88. //
  89. return _observable.Subscribe/*Unsafe*/(observer);
  90. }
  91. }
  92. }
  93. }