QueryLanguage.Binding.cs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Concurrency;
  3. using System.Reactive.Disposables;
  4. using System.Reactive.Subjects;
  5. namespace System.Reactive.Linq
  6. {
  7. using ObservableImpl;
  8. internal partial class QueryLanguage
  9. {
  10. #region + Multicast +
  11. public virtual IConnectableObservable<TResult> Multicast<TSource, TResult>(IObservable<TSource> source, ISubject<TSource, TResult> subject)
  12. {
  13. return new ConnectableObservable<TSource, TResult>(source, subject);
  14. }
  15. public virtual IObservable<TResult> Multicast<TSource, TIntermediate, TResult>(IObservable<TSource> source, Func<ISubject<TSource, TIntermediate>> subjectSelector, Func<IObservable<TIntermediate>, IObservable<TResult>> selector)
  16. {
  17. return new Multicast<TSource, TIntermediate, TResult>(source, subjectSelector, selector);
  18. }
  19. #endregion
  20. #region + Publish +
  21. public virtual IConnectableObservable<TSource> Publish<TSource>(IObservable<TSource> source)
  22. {
  23. return source.Multicast(new Subject<TSource>());
  24. }
  25. public virtual IObservable<TResult> Publish<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector)
  26. {
  27. return source.Multicast(() => new Subject<TSource>(), selector);
  28. }
  29. public virtual IConnectableObservable<TSource> Publish<TSource>(IObservable<TSource> source, TSource initialValue)
  30. {
  31. return source.Multicast(new BehaviorSubject<TSource>(initialValue));
  32. }
  33. public virtual IObservable<TResult> Publish<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, TSource initialValue)
  34. {
  35. return source.Multicast(() => new BehaviorSubject<TSource>(initialValue), selector);
  36. }
  37. #endregion
  38. #region + PublishLast +
  39. public virtual IConnectableObservable<TSource> PublishLast<TSource>(IObservable<TSource> source)
  40. {
  41. return source.Multicast(new AsyncSubject<TSource>());
  42. }
  43. public virtual IObservable<TResult> PublishLast<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector)
  44. {
  45. return source.Multicast(() => new AsyncSubject<TSource>(), selector);
  46. }
  47. #endregion
  48. #region + RefCount +
  49. public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source)
  50. {
  51. return new RefCount<TSource>(source);
  52. }
  53. #endregion
  54. #region + Replay +
  55. public virtual IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source)
  56. {
  57. return source.Multicast(new ReplaySubject<TSource>());
  58. }
  59. public virtual IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, IScheduler scheduler)
  60. {
  61. return source.Multicast(new ReplaySubject<TSource>(scheduler));
  62. }
  63. public virtual IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector)
  64. {
  65. return source.Multicast(() => new ReplaySubject<TSource>(), selector);
  66. }
  67. public virtual IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, IScheduler scheduler)
  68. {
  69. return source.Multicast(() => new ReplaySubject<TSource>(scheduler), selector);
  70. }
  71. public virtual IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, TimeSpan window)
  72. {
  73. return source.Multicast(new ReplaySubject<TSource>(window));
  74. }
  75. public virtual IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, TimeSpan window)
  76. {
  77. return source.Multicast(() => new ReplaySubject<TSource>(window), selector);
  78. }
  79. public virtual IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, TimeSpan window, IScheduler scheduler)
  80. {
  81. return source.Multicast(new ReplaySubject<TSource>(window, scheduler));
  82. }
  83. public virtual IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, TimeSpan window, IScheduler scheduler)
  84. {
  85. return source.Multicast(() => new ReplaySubject<TSource>(window, scheduler), selector);
  86. }
  87. public virtual IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, int bufferSize, IScheduler scheduler)
  88. {
  89. return source.Multicast(new ReplaySubject<TSource>(bufferSize, scheduler));
  90. }
  91. public virtual IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize, IScheduler scheduler)
  92. {
  93. return source.Multicast(() => new ReplaySubject<TSource>(bufferSize, scheduler), selector);
  94. }
  95. public virtual IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, int bufferSize)
  96. {
  97. return source.Multicast(new ReplaySubject<TSource>(bufferSize));
  98. }
  99. public virtual IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize)
  100. {
  101. return source.Multicast(() => new ReplaySubject<TSource>(bufferSize), selector);
  102. }
  103. public virtual IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, int bufferSize, TimeSpan window)
  104. {
  105. return source.Multicast(new ReplaySubject<TSource>(bufferSize, window));
  106. }
  107. public virtual IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize, TimeSpan window)
  108. {
  109. return source.Multicast(() => new ReplaySubject<TSource>(bufferSize, window), selector);
  110. }
  111. public virtual IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, int bufferSize, TimeSpan window, IScheduler scheduler)
  112. {
  113. return source.Multicast(new ReplaySubject<TSource>(bufferSize, window, scheduler));
  114. }
  115. public virtual IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize, TimeSpan window, IScheduler scheduler)
  116. {
  117. return source.Multicast(() => new ReplaySubject<TSource>(bufferSize, window, scheduler), selector);
  118. }
  119. #endregion
  120. }
  121. }