Qbservable.cs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_EXPRESSIONS
  5. #pragma warning disable 1591
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Linq.Expressions;
  9. using System.Reflection;
  10. using System.Reactive.Concurrency;
  11. namespace System.Reactive.Linq
  12. {
  13. /// <summary>
  14. /// Provides a set of static methods for writing queries over observable sequences, allowing translation to a target query language.
  15. /// </summary>
  16. public static partial class Qbservable
  17. {
  18. /// <summary>
  19. /// Returns the input typed as an IObservable&lt;TSource&gt;.
  20. /// This operator is used to separate the part of the query that's captured as an expression tree from the part that's executed locally.
  21. /// </summary>
  22. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  23. /// <param name="source">An IQbservable&lt;TSource&gt; sequence to convert to an IObservable&lt;TSource&gt; sequence.</param>
  24. /// <returns>The original source object, but typed as an IObservable&lt;TSource&gt;.</returns>
  25. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  26. public static IObservable<TSource> AsObservable<TSource>(this IQbservable<TSource> source)
  27. {
  28. if (source == null)
  29. throw new ArgumentNullException("source");
  30. return source;
  31. }
  32. /// <summary>
  33. /// Converts an enumerable sequence to an observable sequence.
  34. /// </summary>
  35. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  36. /// <param name="source">Enumerable sequence to convert to an observable sequence.</param>
  37. /// <returns>The observable sequence whose elements are pulled from the given enumerable sequence.</returns>
  38. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  39. /// <remarks>This operator requires the source's <see cref="IQueryProvider"/> object (see <see cref="IQueryable.Provider"/>) to implement <see cref="IQbservableProvider"/>.</remarks>
  40. public static IQbservable<TSource> ToQbservable<TSource>(this IQueryable<TSource> source)
  41. {
  42. if (source == null)
  43. throw new ArgumentNullException("source");
  44. return ((IQbservableProvider)source.Provider).CreateQuery<TSource>(
  45. Expression.Call(
  46. null,
  47. #if CRIPPLED_REFLECTION
  48. InfoOf(() => Qbservable.ToQbservable<TSource>(default(IQueryable<TSource>))),
  49. #else
  50. ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
  51. #endif
  52. source.Expression
  53. )
  54. );
  55. }
  56. /// <summary>
  57. /// Converts an enumerable sequence to an observable sequence, using the specified scheduler to run the enumeration loop.
  58. /// </summary>
  59. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  60. /// <param name="source">Enumerable sequence to convert to an observable sequence.</param>
  61. /// <param name="scheduler">Scheduler to run the enumeration of the input sequence on.</param>
  62. /// <returns>The observable sequence whose elements are pulled from the given enumerable sequence.</returns>
  63. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  64. /// <remarks>This operator requires the source's <see cref="IQueryProvider"/> object (see <see cref="IQueryable.Provider"/>) to implement <see cref="IQbservableProvider"/>.</remarks>
  65. public static IQbservable<TSource> ToQbservable<TSource>(this IQueryable<TSource> source, IScheduler scheduler)
  66. {
  67. if (source == null)
  68. throw new ArgumentNullException("source");
  69. if (scheduler == null)
  70. throw new ArgumentNullException("scheduler");
  71. return ((IQbservableProvider)source.Provider).CreateQuery<TSource>(
  72. Expression.Call(
  73. null,
  74. #if CRIPPLED_REFLECTION
  75. InfoOf(() => Qbservable.ToQbservable<TSource>(default(IQueryable<TSource>))),
  76. #else
  77. ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
  78. #endif
  79. source.Expression,
  80. Expression.Constant(scheduler)
  81. )
  82. );
  83. }
  84. internal static Expression GetSourceExpression<TSource>(IObservable<TSource> source)
  85. {
  86. var q = source as IQbservable<TSource>;
  87. if (q != null)
  88. return q.Expression;
  89. return Expression.Constant(source, typeof(IObservable<TSource>));
  90. }
  91. internal static Expression GetSourceExpression<TSource>(IEnumerable<TSource> source)
  92. {
  93. var q = source as IQueryable<TSource>;
  94. if (q != null)
  95. return q.Expression;
  96. return Expression.Constant(source, typeof(IEnumerable<TSource>));
  97. }
  98. internal static Expression GetSourceExpression<TSource>(IObservable<TSource>[] sources)
  99. {
  100. return Expression.NewArrayInit(
  101. typeof(IObservable<TSource>),
  102. sources.Select(source => GetSourceExpression(source))
  103. );
  104. }
  105. internal static Expression GetSourceExpression<TSource>(IEnumerable<TSource>[] sources)
  106. {
  107. return Expression.NewArrayInit(
  108. typeof(IEnumerable<TSource>),
  109. sources.Select(source => GetSourceExpression(source))
  110. );
  111. }
  112. internal static MethodInfo InfoOf<R>(Expression<Func<R>> f)
  113. {
  114. return ((MethodCallExpression)f.Body).Method;
  115. }
  116. }
  117. }
  118. #pragma warning restore 1591
  119. #endif