QueryLanguage.Creation.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. using System.Linq;
  7. #if !NO_TPL
  8. using System.Reactive.Threading.Tasks;
  9. using System.Threading.Tasks;
  10. #endif
  11. namespace System.Reactive.Linq
  12. {
  13. #if !NO_PERF
  14. using ObservableImpl;
  15. #endif
  16. internal partial class QueryLanguage
  17. {
  18. #region - Create -
  19. public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
  20. {
  21. return new AnonymousObservable<TSource>(subscribe);
  22. }
  23. public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe)
  24. {
  25. return new AnonymousObservable<TSource>(o =>
  26. {
  27. var a = subscribe(o);
  28. return a != null ? Disposable.Create(a) : Disposable.Empty;
  29. });
  30. }
  31. #endregion
  32. #region - CreateAsync -
  33. #if !NO_TPL
  34. public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
  35. {
  36. return new AnonymousObservable<TResult>(observer =>
  37. {
  38. var cancellable = new CancellationDisposable();
  39. var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
  40. var taskCompletionObserver = new AnonymousObserver<Unit>(Stubs<Unit>.Ignore, observer.OnError, observer.OnCompleted);
  41. var subscription = taskObservable.Subscribe(taskCompletionObserver);
  42. return StableCompositeDisposable.Create(cancellable, subscription);
  43. });
  44. }
  45. public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)
  46. {
  47. return Create<TResult>((observer, token) => subscribeAsync(observer));
  48. }
  49. public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
  50. {
  51. return new AnonymousObservable<TResult>(observer =>
  52. {
  53. var subscription = new SingleAssignmentDisposable();
  54. var cancellable = new CancellationDisposable();
  55. var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
  56. var taskCompletionObserver = new AnonymousObserver<IDisposable>(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop);
  57. //
  58. // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
  59. // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
  60. //
  61. taskObservable.Subscribe(taskCompletionObserver);
  62. return StableCompositeDisposable.Create(cancellable, subscription);
  63. });
  64. }
  65. public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<IDisposable>> subscribeAsync)
  66. {
  67. return Create<TResult>((observer, token) => subscribeAsync(observer));
  68. }
  69. public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
  70. {
  71. return new AnonymousObservable<TResult>(observer =>
  72. {
  73. var subscription = new SingleAssignmentDisposable();
  74. var cancellable = new CancellationDisposable();
  75. var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
  76. var taskCompletionObserver = new AnonymousObserver<Action>(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop);
  77. //
  78. // We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed.
  79. // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
  80. //
  81. taskObservable.Subscribe(taskCompletionObserver);
  82. return StableCompositeDisposable.Create(cancellable, subscription);
  83. });
  84. }
  85. public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<Action>> subscribeAsync)
  86. {
  87. return Create<TResult>((observer, token) => subscribeAsync(observer));
  88. }
  89. #endif
  90. #endregion
  91. #region + Defer +
  92. public virtual IObservable<TValue> Defer<TValue>(Func<IObservable<TValue>> observableFactory)
  93. {
  94. #if !NO_PERF
  95. return new Defer<TValue>(observableFactory);
  96. #else
  97. return new AnonymousObservable<TValue>(observer =>
  98. {
  99. IObservable<TValue> result;
  100. try
  101. {
  102. result = observableFactory();
  103. }
  104. catch (Exception exception)
  105. {
  106. return Throw<TValue>(exception).Subscribe(observer);
  107. }
  108. return result.Subscribe(observer);
  109. });
  110. #endif
  111. }
  112. #endregion
  113. #region + DeferAsync +
  114. #if !NO_TPL
  115. public virtual IObservable<TValue> Defer<TValue>(Func<Task<IObservable<TValue>>> observableFactoryAsync)
  116. {
  117. return Defer(() => StartAsync(observableFactoryAsync).Merge());
  118. }
  119. public virtual IObservable<TValue> Defer<TValue>(Func<CancellationToken, Task<IObservable<TValue>>> observableFactoryAsync)
  120. {
  121. return Defer(() => StartAsync(observableFactoryAsync).Merge());
  122. }
  123. #endif
  124. #endregion
  125. #region + Empty +
  126. public virtual IObservable<TResult> Empty<TResult>()
  127. {
  128. #if !NO_PERF
  129. return new Empty<TResult>(SchedulerDefaults.ConstantTimeOperations);
  130. #else
  131. return Empty_<TResult>(SchedulerDefaults.ConstantTimeOperations);
  132. #endif
  133. }
  134. public virtual IObservable<TResult> Empty<TResult>(IScheduler scheduler)
  135. {
  136. #if !NO_PERF
  137. return new Empty<TResult>(scheduler);
  138. #else
  139. return Empty_<TResult>(scheduler);
  140. #endif
  141. }
  142. #if NO_PERF
  143. private static IObservable<TResult> Empty_<TResult>(IScheduler scheduler)
  144. {
  145. return new AnonymousObservable<TResult>(observer => scheduler.Schedule(observer.OnCompleted));
  146. }
  147. #endif
  148. #endregion
  149. #region + Generate +
  150. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
  151. {
  152. #if !NO_PERF
  153. return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration);
  154. #else
  155. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration);
  156. #endif
  157. }
  158. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
  159. {
  160. #if !NO_PERF
  161. return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, scheduler);
  162. #else
  163. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, scheduler);
  164. #endif
  165. }
  166. #if NO_PERF
  167. private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
  168. {
  169. return new AnonymousObservable<TResult>(observer =>
  170. {
  171. var state = initialState;
  172. var first = true;
  173. return scheduler.Schedule(self =>
  174. {
  175. var hasResult = false;
  176. var result = default(TResult);
  177. try
  178. {
  179. if (first)
  180. first = false;
  181. else
  182. state = iterate(state);
  183. hasResult = condition(state);
  184. if (hasResult)
  185. result = resultSelector(state);
  186. }
  187. catch (Exception exception)
  188. {
  189. observer.OnError(exception);
  190. return;
  191. }
  192. if (hasResult)
  193. {
  194. observer.OnNext(result);
  195. self();
  196. }
  197. else
  198. observer.OnCompleted();
  199. });
  200. });
  201. }
  202. #endif
  203. #endregion
  204. #region + Never +
  205. public virtual IObservable<TResult> Never<TResult>()
  206. {
  207. #if !NO_PERF
  208. return new Never<TResult>();
  209. #else
  210. return new AnonymousObservable<TResult>(observer => Disposable.Empty);
  211. #endif
  212. }
  213. #endregion
  214. #region + Range +
  215. public virtual IObservable<int> Range(int start, int count)
  216. {
  217. return Range_(start, count, SchedulerDefaults.Iteration);
  218. }
  219. public virtual IObservable<int> Range(int start, int count, IScheduler scheduler)
  220. {
  221. return Range_(start, count, scheduler);
  222. }
  223. private static IObservable<int> Range_(int start, int count, IScheduler scheduler)
  224. {
  225. #if !NO_PERF
  226. return new Range(start, count, scheduler);
  227. #else
  228. return new AnonymousObservable<int>(observer =>
  229. {
  230. return scheduler.Schedule(0, (i, self) =>
  231. {
  232. if (i < count)
  233. {
  234. observer.OnNext(start + i);
  235. self(i + 1);
  236. }
  237. else
  238. observer.OnCompleted();
  239. });
  240. });
  241. #endif
  242. }
  243. #endregion
  244. #region + Repeat +
  245. public virtual IObservable<TResult> Repeat<TResult>(TResult value)
  246. {
  247. #if !NO_PERF
  248. return new Repeat<TResult>(value, null, SchedulerDefaults.Iteration);
  249. #else
  250. return Repeat_(value, SchedulerDefaults.Iteration);
  251. #endif
  252. }
  253. public virtual IObservable<TResult> Repeat<TResult>(TResult value, IScheduler scheduler)
  254. {
  255. #if !NO_PERF
  256. return new Repeat<TResult>(value, null, scheduler);
  257. #else
  258. return Repeat_<TResult>(value, scheduler);
  259. #endif
  260. }
  261. #if NO_PERF
  262. private IObservable<TResult> Repeat_<TResult>(TResult value, IScheduler scheduler)
  263. {
  264. return Return(value, scheduler).Repeat();
  265. }
  266. #endif
  267. public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount)
  268. {
  269. #if !NO_PERF
  270. return new Repeat<TResult>(value, repeatCount, SchedulerDefaults.Iteration);
  271. #else
  272. return Repeat_(value, repeatCount, SchedulerDefaults.Iteration);
  273. #endif
  274. }
  275. public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount, IScheduler scheduler)
  276. {
  277. #if !NO_PERF
  278. return new Repeat<TResult>(value, repeatCount, scheduler);
  279. #else
  280. return Repeat_(value, repeatCount, scheduler);
  281. #endif
  282. }
  283. #if NO_PERF
  284. private IObservable<TResult> Repeat_<TResult>(TResult value, int repeatCount, IScheduler scheduler)
  285. {
  286. return Return(value, scheduler).Repeat(repeatCount);
  287. }
  288. #endif
  289. #endregion
  290. #region + Return +
  291. public virtual IObservable<TResult> Return<TResult>(TResult value)
  292. {
  293. #if !NO_PERF
  294. return new Return<TResult>(value, SchedulerDefaults.ConstantTimeOperations);
  295. #else
  296. return Return_<TResult>(value, SchedulerDefaults.ConstantTimeOperations);
  297. #endif
  298. }
  299. public virtual IObservable<TResult> Return<TResult>(TResult value, IScheduler scheduler)
  300. {
  301. #if !NO_PERF
  302. return new Return<TResult>(value, scheduler);
  303. #else
  304. return Return_<TResult>(value, scheduler);
  305. #endif
  306. }
  307. #if NO_PERF
  308. private static IObservable<TResult> Return_<TResult>(TResult value, IScheduler scheduler)
  309. {
  310. return new AnonymousObservable<TResult>(observer =>
  311. scheduler.Schedule(() =>
  312. {
  313. observer.OnNext(value);
  314. observer.OnCompleted();
  315. })
  316. );
  317. }
  318. #endif
  319. #endregion
  320. #region + Throw +
  321. public virtual IObservable<TResult> Throw<TResult>(Exception exception)
  322. {
  323. #if !NO_PERF
  324. return new Throw<TResult>(exception, SchedulerDefaults.ConstantTimeOperations);
  325. #else
  326. return Throw_<TResult>(exception, SchedulerDefaults.ConstantTimeOperations);
  327. #endif
  328. }
  329. public virtual IObservable<TResult> Throw<TResult>(Exception exception, IScheduler scheduler)
  330. {
  331. #if !NO_PERF
  332. return new Throw<TResult>(exception, scheduler);
  333. #else
  334. return Throw_<TResult>(exception, scheduler);
  335. #endif
  336. }
  337. #if NO_PERF
  338. private static IObservable<TResult> Throw_<TResult>(Exception exception, IScheduler scheduler)
  339. {
  340. return new AnonymousObservable<TResult>(observer => scheduler.Schedule(() => observer.OnError(exception)));
  341. }
  342. #endif
  343. #endregion
  344. #region + Using +
  345. public virtual IObservable<TSource> Using<TSource, TResource>(Func<TResource> resourceFactory, Func<TResource, IObservable<TSource>> observableFactory) where TResource : IDisposable
  346. {
  347. #if !NO_PERF
  348. return new Using<TSource, TResource>(resourceFactory, observableFactory);
  349. #else
  350. return new AnonymousObservable<TSource>(observer =>
  351. {
  352. var source = default(IObservable<TSource>);
  353. var disposable = Disposable.Empty;
  354. try
  355. {
  356. var resource = resourceFactory();
  357. if (resource != null)
  358. disposable = resource;
  359. source = observableFactory(resource);
  360. }
  361. catch (Exception exception)
  362. {
  363. return new CompositeDisposable(Throw<TSource>(exception).Subscribe(observer), disposable);
  364. }
  365. return new CompositeDisposable(source.Subscribe(observer), disposable);
  366. });
  367. #endif
  368. }
  369. #endregion
  370. #region - UsingAsync -
  371. #if !NO_TPL
  372. public virtual IObservable<TSource> Using<TSource, TResource>(Func<CancellationToken, Task<TResource>> resourceFactoryAsync, Func<TResource, CancellationToken, Task<IObservable<TSource>>> observableFactoryAsync) where TResource : IDisposable
  373. {
  374. return Observable.FromAsync<TResource>(resourceFactoryAsync)
  375. .SelectMany(resource =>
  376. Observable.Using<TSource, TResource>(
  377. () => resource,
  378. resource_ => Observable.FromAsync<IObservable<TSource>>(ct => observableFactoryAsync(resource_, ct)).Merge()
  379. )
  380. );
  381. }
  382. #endif
  383. #endregion
  384. }
  385. }