ObservableQuery.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Globalization;
  4. using System.Linq;
  5. using System.Linq.Expressions;
  6. using System.Reactive.Joins;
  7. using System.Reactive.Linq;
  8. using System.Reflection;
  9. namespace System.Reactive
  10. {
  11. internal class ObservableQueryProvider : IQbservableProvider, IQueryProvider
  12. {
  13. public IQbservable<TResult> CreateQuery<TResult>(Expression expression)
  14. {
  15. if (expression == null)
  16. throw new ArgumentNullException("expression");
  17. if (!typeof(IObservable<TResult>).IsAssignableFrom(expression.Type))
  18. throw new ArgumentException(Strings_Providers.INVALID_TREE_TYPE, "expression");
  19. return new ObservableQuery<TResult>(expression);
  20. }
  21. IQueryable<TElement> IQueryProvider.CreateQuery<TElement>(Expression expression)
  22. {
  23. //
  24. // Here we're on the edge between IQbservable and IQueryable for the local
  25. // execution case. E.g.:
  26. //
  27. // observable.AsQbservable().<operators>.ToQueryable()
  28. //
  29. // This should be turned into a local execution, with the push-to-pull
  30. // adapter in the middle, so we rewrite to:
  31. //
  32. // observable.AsQbservable().<operators>.ToEnumerable().AsQueryable()
  33. //
  34. var call = expression as MethodCallExpression;
  35. if (call == null || call.Method.DeclaringType != typeof(Qbservable) || call.Method.Name != "ToQueryable")
  36. throw new ArgumentException(Strings_Providers.EXPECTED_TOQUERYABLE_METHODCALL, "expression");
  37. //
  38. // This is the IQbservable<T> object corresponding to the lhs. Now wrap
  39. // it in two calls to get the local queryable.
  40. //
  41. var arg0 = call.Arguments[0];
  42. var res =
  43. Expression.Call(
  44. AsQueryable.MakeGenericMethod(typeof(TElement)),
  45. Expression.Call(
  46. typeof(Observable).GetMethod("ToEnumerable").MakeGenericMethod(typeof(TElement)),
  47. arg0
  48. )
  49. );
  50. //
  51. // Queryable operator calls should be taken care of by the provider for
  52. // LINQ to Objects. So we compile and get the resulting IQueryable<T>
  53. // back to hand it out.
  54. //
  55. return Expression.Lambda<Func<IQueryable<TElement>>>(res).Compile()();
  56. }
  57. private static MethodInfo s_AsQueryable;
  58. private static MethodInfo AsQueryable
  59. {
  60. get
  61. {
  62. if (s_AsQueryable == null)
  63. s_AsQueryable = Qbservable.InfoOf<object>(() => Queryable.AsQueryable<object>(null)).GetGenericMethodDefinition();
  64. return s_AsQueryable;
  65. }
  66. }
  67. IQueryable IQueryProvider.CreateQuery(Expression expression)
  68. {
  69. throw new NotImplementedException();
  70. }
  71. TResult IQueryProvider.Execute<TResult>(Expression expression)
  72. {
  73. throw new NotImplementedException();
  74. }
  75. object IQueryProvider.Execute(Expression expression)
  76. {
  77. throw new NotImplementedException();
  78. }
  79. }
  80. internal class ObservableQuery
  81. {
  82. protected object _source;
  83. protected Expression _expression;
  84. public object Source
  85. {
  86. get { return _source; }
  87. }
  88. public Expression Expression
  89. {
  90. get { return _expression; }
  91. }
  92. }
  93. internal class ObservableQuery<TSource> : ObservableQuery, IQbservable<TSource>
  94. {
  95. internal ObservableQuery(IObservable<TSource> source)
  96. {
  97. _source = source;
  98. _expression = Expression.Constant(this);
  99. }
  100. internal ObservableQuery(Expression expression)
  101. {
  102. _expression = expression;
  103. }
  104. public Type ElementType
  105. {
  106. get { return typeof(TSource); }
  107. }
  108. public IQbservableProvider Provider
  109. {
  110. get { return Qbservable.Provider; }
  111. }
  112. public IDisposable Subscribe(IObserver<TSource> observer)
  113. {
  114. if (_source == null)
  115. {
  116. var rewriter = new ObservableRewriter();
  117. var body = rewriter.Visit(_expression);
  118. var f = Expression.Lambda<Func<IObservable<TSource>>>(body);
  119. _source = f.Compile()();
  120. }
  121. //
  122. // [OK] Use of unsafe Subscribe: non-pretentious mapping to IObservable<T> behavior equivalent to the expression tree.
  123. //
  124. return ((IObservable<TSource>)_source).Subscribe/*Unsafe*/(observer);
  125. }
  126. public override string ToString()
  127. {
  128. var c = _expression as ConstantExpression;
  129. if (c != null && c.Value == this)
  130. {
  131. if (_source != null)
  132. return _source.ToString();
  133. return "null";
  134. }
  135. return _expression.ToString();
  136. }
  137. class ObservableRewriter : ExpressionVisitor
  138. {
  139. protected override Expression VisitConstant(ConstantExpression/*!*/ node)
  140. {
  141. var query = node.Value as ObservableQuery;
  142. if (query != null)
  143. {
  144. var source = query.Source;
  145. if (source != null)
  146. {
  147. return Expression.Constant(source);
  148. }
  149. else
  150. {
  151. return Visit(query.Expression);
  152. }
  153. }
  154. return node;
  155. }
  156. protected override Expression VisitMethodCall(MethodCallExpression/*!*/ node)
  157. {
  158. var method = node.Method;
  159. var declaringType = method.DeclaringType;
  160. #if (CRIPPLED_REFLECTION && HAS_WINRT)
  161. var baseType = declaringType.GetTypeInfo().BaseType;
  162. #else
  163. var baseType = declaringType.BaseType;
  164. #endif
  165. if (baseType == typeof(QueryablePattern))
  166. {
  167. if (method.Name == "Then")
  168. {
  169. //
  170. // Retarget Then to the corresponding pattern. Recursive visit of the lhs will rewrite
  171. // the chain of And operators.
  172. //
  173. var pattern = Visit(node.Object);
  174. var arguments = node.Arguments.Select(arg => Unquote(Visit(arg))).ToArray();
  175. var then = Expression.Call(pattern, "Then", method.GetGenericArguments(), arguments);
  176. return then;
  177. }
  178. else if (method.Name == "And")
  179. {
  180. //
  181. // Retarget And to the corresponding pattern.
  182. //
  183. var lhs = Visit(node.Object);
  184. var arguments = node.Arguments.Select(arg => Visit(arg)).ToArray();
  185. var and = Expression.Call(lhs, "And", method.GetGenericArguments(), arguments);
  186. return and;
  187. }
  188. }
  189. else
  190. {
  191. var arguments = node.Arguments.AsEnumerable();
  192. //
  193. // Checking for an IQbservable operator, being either:
  194. // - an extension method on IQbservableProvider
  195. // - an extension method on IQbservable<T>
  196. //
  197. var isOperator = false;
  198. var firstParameter = method.GetParameters().FirstOrDefault();
  199. if (firstParameter != null)
  200. {
  201. var firstParameterType = firstParameter.ParameterType;
  202. //
  203. // Operators like Qbservable.Amb have an n-ary form that take in an IQbservableProvider
  204. // as the first argument. In such a case we need to make sure that the given provider is
  205. // the one targeting regular Observable. If not, we keep the subtree as-is and let that
  206. // provider handle the execution.
  207. //
  208. if (firstParameterType == typeof(IQbservableProvider))
  209. {
  210. isOperator = true;
  211. //
  212. // Since we could be inside a lambda expression where one tries to obtain a query
  213. // provider, or that provider could be stored in an outer variable, we need to
  214. // evaluate the expression to obtain an IQbservableProvider object.
  215. //
  216. var provider = Expression.Lambda<Func<IQbservableProvider>>(Visit(node.Arguments[0])).Compile()();
  217. //
  218. // Let's see whether the ObservableQuery provider is targeted. This one always goes
  219. // to local execution. E.g.:
  220. //
  221. // var xs = Observable.Return(1).AsQbservable();
  222. // var ys = Observable.Return(2).AsQbservable();
  223. // var zs = Observable.Return(3).AsQbservable();
  224. //
  225. // var res = Qbservable.Provider.Amb(xs, ys, zs);
  226. // ^^^^^^^^^^^^^^^^^^^
  227. //
  228. if (provider is ObservableQueryProvider)
  229. {
  230. //
  231. // For further rewrite, simply ignore the query provider argument now to match
  232. // up with the Observable signature. E.g.:
  233. //
  234. // var res = Qbservable.Provider.Amb(xs, ys, zs);
  235. // = Qbservable.Amb(Qbservable.Provider, xs, ys, zs)'
  236. // ^^^^^^^^^^^^^^^^^^^
  237. // ->
  238. // var res = Observable.Amb(xs, ys, zs);
  239. //
  240. arguments = arguments.Skip(1);
  241. }
  242. else
  243. {
  244. //
  245. // We've hit an unknown provider and will defer further execution to it. Upon
  246. // calling Subscribe to the node's output, that provider will take care of it.
  247. //
  248. return node;
  249. }
  250. }
  251. else if (typeof(IQbservable).IsAssignableFrom(firstParameterType))
  252. {
  253. isOperator = true;
  254. }
  255. }
  256. if (isOperator)
  257. {
  258. var args = VisitQbservableOperatorArguments(method, arguments);
  259. return FindObservableMethod(method, args);
  260. }
  261. }
  262. return base.VisitMethodCall(node);
  263. }
  264. #if NO_VISITLAMBDAOFT
  265. protected override Expression VisitLambda(LambdaExpression node)
  266. #else
  267. protected override Expression VisitLambda<T>(Expression<T> node)
  268. #endif
  269. {
  270. return node;
  271. }
  272. private IList<Expression> VisitQbservableOperatorArguments(MethodInfo method, IEnumerable<Expression> arguments)
  273. {
  274. //
  275. // Recognize the Qbservable.When<TResult>(IQbservableProvider, QueryablePlan<TResult>[])
  276. // overload in order to substitute the array with a Plan<TResult>[].
  277. //
  278. if (method.Name == "When")
  279. {
  280. var lastArgument = arguments.Last();
  281. if (lastArgument.NodeType == ExpressionType.NewArrayInit)
  282. {
  283. var paramsArray = (NewArrayExpression)lastArgument;
  284. return new List<Expression>
  285. {
  286. Expression.NewArrayInit(
  287. typeof(Plan<>).MakeGenericType(method.GetGenericArguments()[0]),
  288. paramsArray.Expressions.Select(param => Visit(param))
  289. )
  290. };
  291. }
  292. }
  293. return arguments.Select(arg => Visit(arg)).ToList();
  294. }
  295. class Lazy<T>
  296. {
  297. private readonly Func<T> _factory;
  298. private T _value;
  299. private bool _initialized;
  300. public Lazy(Func<T> factory)
  301. {
  302. _factory = factory;
  303. }
  304. public T Value
  305. {
  306. get
  307. {
  308. lock (_factory)
  309. {
  310. if (!_initialized)
  311. {
  312. _value = _factory();
  313. _initialized = true;
  314. }
  315. }
  316. return _value;
  317. }
  318. }
  319. }
  320. private static Lazy<ILookup<string, MethodInfo>> _observableMethods = new Lazy<ILookup<string, MethodInfo>>(() => GetMethods(typeof(Observable)));
  321. private static MethodCallExpression FindObservableMethod(MethodInfo method, IList<Expression> arguments)
  322. {
  323. //
  324. // Where to look for the matching operator?
  325. //
  326. var targetType = default(Type);
  327. var methods = default(ILookup<string, MethodInfo>);
  328. if (method.DeclaringType == typeof(Qbservable))
  329. {
  330. targetType = typeof(Observable);
  331. methods = _observableMethods.Value;
  332. }
  333. else
  334. {
  335. targetType = method.DeclaringType;
  336. #if (CRIPPLED_REFLECTION && HAS_WINRT)
  337. var typeInfo = targetType.GetTypeInfo();
  338. if (typeInfo.IsDefined(typeof(LocalQueryMethodImplementationTypeAttribute), false))
  339. {
  340. var mapping = (LocalQueryMethodImplementationTypeAttribute)typeInfo.GetCustomAttributes(typeof(LocalQueryMethodImplementationTypeAttribute), false).Single();
  341. targetType = mapping.TargetType;
  342. }
  343. #else
  344. if (targetType.IsDefined(typeof(LocalQueryMethodImplementationTypeAttribute), false))
  345. {
  346. var mapping = (LocalQueryMethodImplementationTypeAttribute)targetType.GetCustomAttributes(typeof(LocalQueryMethodImplementationTypeAttribute), false)[0];
  347. targetType = mapping.TargetType;
  348. }
  349. #endif
  350. methods = GetMethods(targetType);
  351. }
  352. //
  353. // From all the operators with the method's name, find the one that matches all arguments.
  354. //
  355. var typeArgs = method.IsGenericMethod ? method.GetGenericArguments() : null;
  356. var targetMethod = methods[method.Name].FirstOrDefault(candidateMethod => ArgsMatch(candidateMethod, arguments, typeArgs));
  357. if (targetMethod == null)
  358. throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Providers.NO_MATCHING_METHOD_FOUND, method.Name, targetType.Name));
  359. //
  360. // Restore generic arguments.
  361. //
  362. if (typeArgs != null)
  363. targetMethod = targetMethod.MakeGenericMethod(typeArgs);
  364. //
  365. // Finally, we need to deal with mismatches on Expression<Func<...>> versus Func<...>.
  366. //
  367. var parameters = targetMethod.GetParameters();
  368. for (int i = 0, n = parameters.Length; i < n; i++)
  369. {
  370. arguments[i] = Unquote(arguments[i]);
  371. }
  372. //
  373. // Emit a new call to the discovered target method.
  374. //
  375. return Expression.Call(null, targetMethod, arguments);
  376. }
  377. private static ILookup<string, MethodInfo> GetMethods(Type type)
  378. {
  379. #if !(CRIPPLED_REFLECTION && HAS_WINRT)
  380. return type.GetMethods(BindingFlags.Static | BindingFlags.Public).ToLookup(m => m.Name);
  381. #else
  382. return type.GetTypeInfo().DeclaredMethods.Where(m => m.IsStatic && m.IsPublic).ToLookup(m => m.Name);
  383. #endif
  384. }
  385. private static bool ArgsMatch(MethodInfo method, IList<Expression> arguments, Type[] typeArgs)
  386. {
  387. //
  388. // Number of parameters should match. Notice we've sanitized IQbservableProvider "this"
  389. // parameters first (see VisitMethodCall).
  390. //
  391. var parameters = method.GetParameters();
  392. if (parameters.Length != arguments.Count)
  393. return false;
  394. //
  395. // Genericity should match too.
  396. //
  397. if (!method.IsGenericMethod && typeArgs != null && typeArgs.Length > 0)
  398. return false;
  399. //
  400. // Reconstruct the generic method if needed.
  401. //
  402. if (method.IsGenericMethodDefinition)
  403. {
  404. if (typeArgs == null)
  405. return false;
  406. if (method.GetGenericArguments().Length != typeArgs.Length)
  407. return false;
  408. var result = method.MakeGenericMethod(typeArgs);
  409. parameters = result.GetParameters();
  410. }
  411. //
  412. // Check compatibility for the parameter types.
  413. //
  414. for (int i = 0, n = arguments.Count; i < n; i++)
  415. {
  416. var parameterType = parameters[i].ParameterType;
  417. var argument = arguments[i];
  418. //
  419. // For operators that take a function (like Where, Select), we'll be faced
  420. // with a quoted argument and a discrepancy between Expression<Func<...>>
  421. // and the underlying Func<...>.
  422. //
  423. if (!parameterType.IsAssignableFrom(argument.Type))
  424. {
  425. argument = Unquote(argument);
  426. if (!parameterType.IsAssignableFrom(argument.Type))
  427. return false;
  428. }
  429. }
  430. return true;
  431. }
  432. private static Expression Unquote(Expression expression)
  433. {
  434. //
  435. // Get rid of all outer quotes around an expression.
  436. //
  437. while (expression.NodeType == ExpressionType.Quote)
  438. expression = ((UnaryExpression)expression).Operand;
  439. return expression;
  440. }
  441. }
  442. }
  443. #if (CRIPPLED_REFLECTION && HAS_WINRT)
  444. static class Helpers
  445. {
  446. public static MethodInfo GetMethod(this Type type, string name)
  447. {
  448. return type.GetTypeInfo().GetDeclaredMethod(name);
  449. }
  450. public static bool IsAssignableFrom(this Type type1, Type type2)
  451. {
  452. return type1.GetTypeInfo().IsAssignableFrom(type2.GetTypeInfo());
  453. }
  454. }
  455. #endif
  456. }