ObservableQuery.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Globalization;
  6. using System.Linq;
  7. using System.Linq.Expressions;
  8. using System.Reactive.Joins;
  9. using System.Reactive.Linq;
  10. using System.Reflection;
  11. namespace System.Reactive
  12. {
  13. internal class ObservableQueryProvider : IQbservableProvider, IQueryProvider
  14. {
  15. public IQbservable<TResult> CreateQuery<TResult>(Expression expression)
  16. {
  17. if (expression == null)
  18. {
  19. throw new ArgumentNullException(nameof(expression));
  20. }
  21. if (!typeof(IObservable<TResult>).IsAssignableFrom(expression.Type))
  22. {
  23. throw new ArgumentException(Strings_Providers.INVALID_TREE_TYPE, nameof(expression));
  24. }
  25. return new ObservableQuery<TResult>(expression);
  26. }
  27. IQueryable<TElement> IQueryProvider.CreateQuery<TElement>(Expression expression)
  28. {
  29. //
  30. // Here we're on the edge between IQbservable and IQueryable for the local
  31. // execution case. E.g.:
  32. //
  33. // observable.AsQbservable().<operators>.ToQueryable()
  34. //
  35. // This should be turned into a local execution, with the push-to-pull
  36. // adapter in the middle, so we rewrite to:
  37. //
  38. // observable.AsQbservable().<operators>.ToEnumerable().AsQueryable()
  39. //
  40. if (!(expression is MethodCallExpression call) ||
  41. call.Method.DeclaringType != typeof(Qbservable) ||
  42. call.Method.Name != nameof(Qbservable.ToQueryable))
  43. {
  44. throw new ArgumentException(Strings_Providers.EXPECTED_TOQUERYABLE_METHODCALL, nameof(expression));
  45. }
  46. //
  47. // This is the IQbservable<T> object corresponding to the lhs. Now wrap
  48. // it in two calls to get the local queryable.
  49. //
  50. var arg0 = call.Arguments[0];
  51. var res =
  52. Expression.Call(
  53. AsQueryable.MakeGenericMethod(typeof(TElement)),
  54. Expression.Call(
  55. typeof(Observable).GetMethod(nameof(Observable.ToEnumerable)).MakeGenericMethod(typeof(TElement)),
  56. arg0
  57. )
  58. );
  59. //
  60. // Queryable operator calls should be taken care of by the provider for
  61. // LINQ to Objects. So we compile and get the resulting IQueryable<T>
  62. // back to hand it out.
  63. //
  64. return Expression.Lambda<Func<IQueryable<TElement>>>(res).Compile()();
  65. }
  66. private static MethodInfo _staticAsQueryable;
  67. private static MethodInfo AsQueryable
  68. {
  69. get
  70. {
  71. if (_staticAsQueryable == null)
  72. {
  73. _staticAsQueryable = Qbservable.InfoOf<object>(() => Queryable.AsQueryable<object>(null)).GetGenericMethodDefinition();
  74. }
  75. return _staticAsQueryable;
  76. }
  77. }
  78. IQueryable IQueryProvider.CreateQuery(Expression expression)
  79. {
  80. throw new NotImplementedException();
  81. }
  82. TResult IQueryProvider.Execute<TResult>(Expression expression)
  83. {
  84. throw new NotImplementedException();
  85. }
  86. object IQueryProvider.Execute(Expression expression)
  87. {
  88. throw new NotImplementedException();
  89. }
  90. }
  91. internal class ObservableQuery
  92. {
  93. protected object _source;
  94. protected Expression _expression;
  95. public object Source
  96. {
  97. get { return _source; }
  98. }
  99. public Expression Expression
  100. {
  101. get { return _expression; }
  102. }
  103. }
  104. internal class ObservableQuery<TSource> : ObservableQuery, IQbservable<TSource>
  105. {
  106. internal ObservableQuery(IObservable<TSource> source)
  107. {
  108. _source = source;
  109. _expression = Expression.Constant(this);
  110. }
  111. internal ObservableQuery(Expression expression)
  112. {
  113. _expression = expression;
  114. }
  115. public Type ElementType => typeof(TSource);
  116. public IQbservableProvider Provider => Qbservable.Provider;
  117. public IDisposable Subscribe(IObserver<TSource> observer)
  118. {
  119. if (_source == null)
  120. {
  121. var rewriter = new ObservableRewriter();
  122. var body = rewriter.Visit(_expression);
  123. var f = Expression.Lambda<Func<IObservable<TSource>>>(body);
  124. _source = f.Compile()();
  125. }
  126. //
  127. // [OK] Use of unsafe Subscribe: non-pretentious mapping to IObservable<T> behavior equivalent to the expression tree.
  128. //
  129. return ((IObservable<TSource>)_source).Subscribe/*Unsafe*/(observer);
  130. }
  131. public override string ToString()
  132. {
  133. if (_expression is ConstantExpression c && c.Value == this)
  134. {
  135. if (_source != null)
  136. {
  137. return _source.ToString();
  138. }
  139. return "null";
  140. }
  141. return _expression.ToString();
  142. }
  143. private class ObservableRewriter : ExpressionVisitor
  144. {
  145. protected override Expression VisitConstant(ConstantExpression/*!*/ node)
  146. {
  147. if (node.Value is ObservableQuery query)
  148. {
  149. var source = query.Source;
  150. if (source != null)
  151. {
  152. return Expression.Constant(source);
  153. }
  154. return Visit(query.Expression);
  155. }
  156. return node;
  157. }
  158. protected override Expression VisitMethodCall(MethodCallExpression/*!*/ node)
  159. {
  160. var method = node.Method;
  161. var declaringType = method.DeclaringType;
  162. #if (CRIPPLED_REFLECTION && HAS_WINRT)
  163. var baseType = declaringType.GetTypeInfo().BaseType;
  164. #else
  165. var baseType = declaringType.BaseType;
  166. #endif
  167. if (baseType == typeof(QueryablePattern))
  168. {
  169. if (method.Name == "Then")
  170. {
  171. //
  172. // Retarget Then to the corresponding pattern. Recursive visit of the lhs will rewrite
  173. // the chain of And operators.
  174. //
  175. var pattern = Visit(node.Object);
  176. var arguments = node.Arguments.Select(arg => Unquote(Visit(arg))).ToArray();
  177. var then = Expression.Call(pattern, "Then", method.GetGenericArguments(), arguments);
  178. return then;
  179. }
  180. if (method.Name == "And")
  181. {
  182. //
  183. // Retarget And to the corresponding pattern.
  184. //
  185. var lhs = Visit(node.Object);
  186. var arguments = node.Arguments.Select(arg => Visit(arg)).ToArray();
  187. var and = Expression.Call(lhs, "And", method.GetGenericArguments(), arguments);
  188. return and;
  189. }
  190. }
  191. else
  192. {
  193. var arguments = node.Arguments.AsEnumerable();
  194. //
  195. // Checking for an IQbservable operator, being either:
  196. // - an extension method on IQbservableProvider
  197. // - an extension method on IQbservable<T>
  198. //
  199. var isOperator = false;
  200. var firstParameter = method.GetParameters().FirstOrDefault();
  201. if (firstParameter != null)
  202. {
  203. var firstParameterType = firstParameter.ParameterType;
  204. //
  205. // Operators like Qbservable.Amb have an n-ary form that take in an IQbservableProvider
  206. // as the first argument. In such a case we need to make sure that the given provider is
  207. // the one targeting regular Observable. If not, we keep the subtree as-is and let that
  208. // provider handle the execution.
  209. //
  210. if (firstParameterType == typeof(IQbservableProvider))
  211. {
  212. isOperator = true;
  213. //
  214. // Since we could be inside a lambda expression where one tries to obtain a query
  215. // provider, or that provider could be stored in an outer variable, we need to
  216. // evaluate the expression to obtain an IQbservableProvider object.
  217. //
  218. var provider = Expression.Lambda<Func<IQbservableProvider>>(Visit(node.Arguments[0])).Compile()();
  219. //
  220. // Let's see whether the ObservableQuery provider is targeted. This one always goes
  221. // to local execution. E.g.:
  222. //
  223. // var xs = Observable.Return(1).AsQbservable();
  224. // var ys = Observable.Return(2).AsQbservable();
  225. // var zs = Observable.Return(3).AsQbservable();
  226. //
  227. // var res = Qbservable.Provider.Amb(xs, ys, zs);
  228. // ^^^^^^^^^^^^^^^^^^^
  229. //
  230. if (provider is ObservableQueryProvider)
  231. {
  232. //
  233. // For further rewrite, simply ignore the query provider argument now to match
  234. // up with the Observable signature. E.g.:
  235. //
  236. // var res = Qbservable.Provider.Amb(xs, ys, zs);
  237. // = Qbservable.Amb(Qbservable.Provider, xs, ys, zs)'
  238. // ^^^^^^^^^^^^^^^^^^^
  239. // ->
  240. // var res = Observable.Amb(xs, ys, zs);
  241. //
  242. arguments = arguments.Skip(1);
  243. }
  244. else
  245. {
  246. //
  247. // We've hit an unknown provider and will defer further execution to it. Upon
  248. // calling Subscribe to the node's output, that provider will take care of it.
  249. //
  250. return node;
  251. }
  252. }
  253. else if (typeof(IQbservable).IsAssignableFrom(firstParameterType))
  254. {
  255. isOperator = true;
  256. }
  257. }
  258. if (isOperator)
  259. {
  260. var args = VisitQbservableOperatorArguments(method, arguments);
  261. return FindObservableMethod(method, args);
  262. }
  263. }
  264. return base.VisitMethodCall(node);
  265. }
  266. #if NO_VISITLAMBDAOFT
  267. protected override Expression VisitLambda(LambdaExpression node)
  268. #else
  269. protected override Expression VisitLambda<T>(Expression<T> node)
  270. #endif
  271. {
  272. return node;
  273. }
  274. private IList<Expression> VisitQbservableOperatorArguments(MethodInfo method, IEnumerable<Expression> arguments)
  275. {
  276. //
  277. // Recognize the Qbservable.When<TResult>(IQbservableProvider, QueryablePlan<TResult>[])
  278. // overload in order to substitute the array with a Plan<TResult>[].
  279. //
  280. if (method.Name == "When")
  281. {
  282. var lastArgument = arguments.Last();
  283. if (lastArgument.NodeType == ExpressionType.NewArrayInit)
  284. {
  285. var paramsArray = (NewArrayExpression)lastArgument;
  286. return new List<Expression>
  287. {
  288. Expression.NewArrayInit(
  289. typeof(Plan<>).MakeGenericType(method.GetGenericArguments()[0]),
  290. paramsArray.Expressions.Select(param => Visit(param))
  291. )
  292. };
  293. }
  294. }
  295. return arguments.Select(arg => Visit(arg)).ToList();
  296. }
  297. private class Lazy<T>
  298. {
  299. private readonly Func<T> _factory;
  300. private T _value;
  301. private bool _initialized;
  302. public Lazy(Func<T> factory)
  303. {
  304. _factory = factory;
  305. }
  306. public T Value
  307. {
  308. get
  309. {
  310. lock (_factory)
  311. {
  312. if (!_initialized)
  313. {
  314. _value = _factory();
  315. _initialized = true;
  316. }
  317. }
  318. return _value;
  319. }
  320. }
  321. }
  322. private static Lazy<ILookup<string, MethodInfo>> _observableMethods = new Lazy<ILookup<string, MethodInfo>>(() => GetMethods(typeof(Observable)));
  323. private static MethodCallExpression FindObservableMethod(MethodInfo method, IList<Expression> arguments)
  324. {
  325. //
  326. // Where to look for the matching operator?
  327. //
  328. var targetType = default(Type);
  329. var methods = default(ILookup<string, MethodInfo>);
  330. if (method.DeclaringType == typeof(Qbservable))
  331. {
  332. targetType = typeof(Observable);
  333. methods = _observableMethods.Value;
  334. }
  335. else
  336. {
  337. targetType = method.DeclaringType;
  338. #if (CRIPPLED_REFLECTION && HAS_WINRT)
  339. var typeInfo = targetType.GetTypeInfo();
  340. if (typeInfo.IsDefined(typeof(LocalQueryMethodImplementationTypeAttribute), false))
  341. {
  342. var mapping = (LocalQueryMethodImplementationTypeAttribute)typeInfo.GetCustomAttributes(typeof(LocalQueryMethodImplementationTypeAttribute), false).Single();
  343. targetType = mapping.TargetType;
  344. }
  345. #else
  346. if (targetType.IsDefined(typeof(LocalQueryMethodImplementationTypeAttribute), false))
  347. {
  348. var mapping = (LocalQueryMethodImplementationTypeAttribute)targetType.GetCustomAttributes(typeof(LocalQueryMethodImplementationTypeAttribute), false)[0];
  349. targetType = mapping.TargetType;
  350. }
  351. #endif
  352. methods = GetMethods(targetType);
  353. }
  354. //
  355. // From all the operators with the method's name, find the one that matches all arguments.
  356. //
  357. var typeArgs = method.IsGenericMethod ? method.GetGenericArguments() : null;
  358. var targetMethod = methods[method.Name].FirstOrDefault(candidateMethod => ArgsMatch(candidateMethod, arguments, typeArgs));
  359. if (targetMethod == null)
  360. {
  361. throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Providers.NO_MATCHING_METHOD_FOUND, method.Name, targetType.Name));
  362. }
  363. //
  364. // Restore generic arguments.
  365. //
  366. if (typeArgs != null)
  367. {
  368. targetMethod = targetMethod.MakeGenericMethod(typeArgs);
  369. }
  370. //
  371. // Finally, we need to deal with mismatches on Expression<Func<...>> versus Func<...>.
  372. //
  373. var parameters = targetMethod.GetParameters();
  374. for (int i = 0, n = parameters.Length; i < n; i++)
  375. {
  376. arguments[i] = Unquote(arguments[i]);
  377. }
  378. //
  379. // Emit a new call to the discovered target method.
  380. //
  381. return Expression.Call(null, targetMethod, arguments);
  382. }
  383. private static ILookup<string, MethodInfo> GetMethods(Type type)
  384. {
  385. #if !(CRIPPLED_REFLECTION && HAS_WINRT)
  386. return type.GetMethods(BindingFlags.Static | BindingFlags.Public).ToLookup(m => m.Name);
  387. #else
  388. return type.GetTypeInfo().DeclaredMethods.Where(m => m.IsStatic && m.IsPublic).ToLookup(m => m.Name);
  389. #endif
  390. }
  391. private static bool ArgsMatch(MethodInfo method, IList<Expression> arguments, Type[] typeArgs)
  392. {
  393. //
  394. // Number of parameters should match. Notice we've sanitized IQbservableProvider "this"
  395. // parameters first (see VisitMethodCall).
  396. //
  397. var parameters = method.GetParameters();
  398. if (parameters.Length != arguments.Count)
  399. {
  400. return false;
  401. }
  402. //
  403. // Genericity should match too.
  404. //
  405. if (!method.IsGenericMethod && typeArgs != null && typeArgs.Length > 0)
  406. {
  407. return false;
  408. }
  409. //
  410. // Reconstruct the generic method if needed.
  411. //
  412. if (method.IsGenericMethodDefinition)
  413. {
  414. if (typeArgs == null)
  415. {
  416. return false;
  417. }
  418. if (method.GetGenericArguments().Length != typeArgs.Length)
  419. {
  420. return false;
  421. }
  422. var result = method.MakeGenericMethod(typeArgs);
  423. parameters = result.GetParameters();
  424. }
  425. //
  426. // Check compatibility for the parameter types.
  427. //
  428. for (int i = 0, n = arguments.Count; i < n; i++)
  429. {
  430. var parameterType = parameters[i].ParameterType;
  431. var argument = arguments[i];
  432. //
  433. // For operators that take a function (like Where, Select), we'll be faced
  434. // with a quoted argument and a discrepancy between Expression<Func<...>>
  435. // and the underlying Func<...>.
  436. //
  437. if (!parameterType.IsAssignableFrom(argument.Type))
  438. {
  439. argument = Unquote(argument);
  440. if (!parameterType.IsAssignableFrom(argument.Type))
  441. {
  442. return false;
  443. }
  444. }
  445. }
  446. return true;
  447. }
  448. private static Expression Unquote(Expression expression)
  449. {
  450. //
  451. // Get rid of all outer quotes around an expression.
  452. //
  453. while (expression.NodeType == ExpressionType.Quote)
  454. {
  455. expression = ((UnaryExpression)expression).Operand;
  456. }
  457. return expression;
  458. }
  459. }
  460. }
  461. }