1
0

QueryLanguage.Time.cs 68 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Reactive.Disposables;
  5. using System.Reactive.Subjects;
  6. namespace System.Reactive.Linq
  7. {
  8. #if !NO_PERF
  9. using ObservableImpl;
  10. #endif
  11. internal partial class QueryLanguage
  12. {
  13. #region + Buffer +
  14. #region TimeSpan only
  15. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan)
  16. {
  17. return Buffer_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations);
  18. }
  19. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
  20. {
  21. return Buffer_<TSource>(source, timeSpan, timeSpan, scheduler);
  22. }
  23. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  24. {
  25. return Buffer_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations);
  26. }
  27. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  28. {
  29. return Buffer_<TSource>(source, timeSpan, timeShift, scheduler);
  30. }
  31. private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  32. {
  33. #if !NO_PERF
  34. return new Buffer<TSource>(source, timeSpan, timeShift, scheduler);
  35. #else
  36. return source.Window(timeSpan, timeShift, scheduler).SelectMany(Observable.ToList);
  37. #endif
  38. }
  39. #endregion
  40. #region TimeSpan + int
  41. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count)
  42. {
  43. return Buffer_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations);
  44. }
  45. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  46. {
  47. return Buffer_<TSource>(source, timeSpan, count, scheduler);
  48. }
  49. private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  50. {
  51. #if !NO_PERF
  52. return new Buffer<TSource>(source, timeSpan, count, scheduler);
  53. #else
  54. return source.Window(timeSpan, count, scheduler).SelectMany(Observable.ToList);
  55. #endif
  56. }
  57. #endregion
  58. #endregion
  59. #region + Delay +
  60. #region TimeSpan
  61. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  62. {
  63. return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  64. }
  65. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  66. {
  67. return Delay_<TSource>(source, dueTime, scheduler);
  68. }
  69. private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  70. {
  71. #if !NO_PERF
  72. return new Delay<TSource>(source, dueTime, scheduler);
  73. #else
  74. return new AnonymousObservable<TSource>(observer =>
  75. {
  76. var gate = new object();
  77. var q = new Queue<Timestamped<Notification<TSource>>>();
  78. var active = false;
  79. var running = false;
  80. var cancelable = new SerialDisposable();
  81. var exception = default(Exception);
  82. var subscription = source.Materialize().Timestamp(scheduler).Subscribe(notification =>
  83. {
  84. var shouldRun = false;
  85. lock (gate)
  86. {
  87. if (notification.Value.Kind == NotificationKind.OnError)
  88. {
  89. q.Clear();
  90. q.Enqueue(notification);
  91. exception = notification.Value.Exception;
  92. shouldRun = !running;
  93. }
  94. else
  95. {
  96. q.Enqueue(new Timestamped<Notification<TSource>>(notification.Value, notification.Timestamp.Add(dueTime)));
  97. shouldRun = !active;
  98. active = true;
  99. }
  100. }
  101. if (shouldRun)
  102. {
  103. if (exception != null)
  104. observer.OnError(exception);
  105. else
  106. {
  107. var d = new SingleAssignmentDisposable();
  108. cancelable.Disposable = d;
  109. d.Disposable = scheduler.Schedule(dueTime, self =>
  110. {
  111. lock (gate)
  112. {
  113. if (exception != null)
  114. return;
  115. running = true;
  116. }
  117. Notification<TSource> result;
  118. do
  119. {
  120. result = null;
  121. lock (gate)
  122. {
  123. if (q.Count > 0 && q.Peek().Timestamp.CompareTo(scheduler.Now) <= 0)
  124. result = q.Dequeue().Value;
  125. }
  126. if (result != null)
  127. result.Accept(observer);
  128. } while (result != null);
  129. var shouldRecurse = false;
  130. var recurseDueTime = TimeSpan.Zero;
  131. var e = default(Exception);
  132. lock (gate)
  133. {
  134. if (q.Count > 0)
  135. {
  136. shouldRecurse = true;
  137. recurseDueTime = TimeSpan.FromTicks(Math.Max(0, q.Peek().Timestamp.Subtract(scheduler.Now).Ticks));
  138. }
  139. else
  140. active = false;
  141. e = exception;
  142. running = false;
  143. }
  144. if (e != null)
  145. observer.OnError(e);
  146. else if (shouldRecurse)
  147. self(recurseDueTime);
  148. });
  149. }
  150. }
  151. });
  152. return new CompositeDisposable(subscription, cancelable);
  153. });
  154. #endif
  155. }
  156. #endregion
  157. #region DateTimeOffset
  158. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)
  159. {
  160. return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  161. }
  162. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  163. {
  164. return Delay_<TSource>(source, dueTime, scheduler);
  165. }
  166. private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  167. {
  168. #if !NO_PERF
  169. return new Delay<TSource>(source, dueTime, scheduler);
  170. #else
  171. return Observable.Defer(() =>
  172. {
  173. var timeSpan = dueTime.Subtract(scheduler.Now);
  174. return Delay_<TSource>(source, timeSpan, scheduler);
  175. });
  176. #endif
  177. }
  178. #endregion
  179. #region Duration selector
  180. public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delayDurationSelector)
  181. {
  182. return Delay_<TSource, TDelay>(source, null, delayDurationSelector);
  183. }
  184. public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)
  185. {
  186. return Delay_<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);
  187. }
  188. private static IObservable<TSource> Delay_<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)
  189. {
  190. #if !NO_PERF
  191. return new Delay<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);
  192. #else
  193. return new AnonymousObservable<TSource>(observer =>
  194. {
  195. var delays = new CompositeDisposable();
  196. var gate = new object();
  197. var atEnd = false;
  198. var done = new Action(() =>
  199. {
  200. if (atEnd && delays.Count == 0)
  201. {
  202. observer.OnCompleted();
  203. }
  204. });
  205. var subscription = new SerialDisposable();
  206. var start = new Action(() =>
  207. {
  208. subscription.Disposable = source.Subscribe(
  209. x =>
  210. {
  211. var delay = default(IObservable<TDelay>);
  212. try
  213. {
  214. delay = delayDurationSelector(x);
  215. }
  216. catch (Exception error)
  217. {
  218. lock (gate)
  219. observer.OnError(error);
  220. return;
  221. }
  222. var d = new SingleAssignmentDisposable();
  223. delays.Add(d);
  224. d.Disposable = delay.Subscribe(
  225. _ =>
  226. {
  227. lock (gate)
  228. {
  229. observer.OnNext(x);
  230. delays.Remove(d);
  231. done();
  232. }
  233. },
  234. exception =>
  235. {
  236. lock (gate)
  237. observer.OnError(exception);
  238. },
  239. () =>
  240. {
  241. lock (gate)
  242. {
  243. observer.OnNext(x);
  244. delays.Remove(d);
  245. done();
  246. }
  247. }
  248. );
  249. },
  250. exception =>
  251. {
  252. lock (gate)
  253. {
  254. observer.OnError(exception);
  255. }
  256. },
  257. () =>
  258. {
  259. lock (gate)
  260. {
  261. atEnd = true;
  262. subscription.Dispose();
  263. done();
  264. }
  265. }
  266. );
  267. });
  268. if (subscriptionDelay == null)
  269. {
  270. start();
  271. }
  272. else
  273. {
  274. subscription.Disposable = subscriptionDelay.Subscribe(
  275. _ =>
  276. {
  277. start();
  278. },
  279. observer.OnError,
  280. start
  281. );
  282. }
  283. return new CompositeDisposable(subscription, delays);
  284. });
  285. #endif
  286. }
  287. #endregion
  288. #endregion
  289. #region + DelaySubscription +
  290. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  291. {
  292. return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  293. }
  294. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  295. {
  296. return DelaySubscription_<TSource>(source, dueTime, scheduler);
  297. }
  298. private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  299. {
  300. #if !NO_PERF
  301. return new DelaySubscription<TSource>(source, dueTime, scheduler);
  302. #else
  303. return new AnonymousObservable<TSource>(observer =>
  304. {
  305. var d = new MultipleAssignmentDisposable();
  306. var dt = Normalize(dueTime);
  307. d.Disposable = scheduler.Schedule(dt, () =>
  308. {
  309. d.Disposable = source.Subscribe(observer);
  310. });
  311. return d;
  312. });
  313. #endif
  314. }
  315. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)
  316. {
  317. return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  318. }
  319. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  320. {
  321. return DelaySubscription_<TSource>(source, dueTime, scheduler);
  322. }
  323. private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  324. {
  325. #if !NO_PERF
  326. return new DelaySubscription<TSource>(source, dueTime, scheduler);
  327. #else
  328. return new AnonymousObservable<TSource>(observer =>
  329. {
  330. var d = new MultipleAssignmentDisposable();
  331. d.Disposable = scheduler.Schedule(dueTime, () =>
  332. {
  333. d.Disposable = source.Subscribe(observer);
  334. });
  335. return d;
  336. });
  337. #endif
  338. }
  339. #endregion
  340. #region + Generate +
  341. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector)
  342. {
  343. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations);
  344. }
  345. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
  346. {
  347. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  348. }
  349. private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
  350. {
  351. #if !NO_PERF
  352. return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  353. #else
  354. return new AnonymousObservable<TResult>(observer =>
  355. {
  356. var state = initialState;
  357. var first = true;
  358. var hasResult = false;
  359. var result = default(TResult);
  360. var time = default(TimeSpan);
  361. return scheduler.Schedule(TimeSpan.Zero, self =>
  362. {
  363. if (hasResult)
  364. observer.OnNext(result);
  365. try
  366. {
  367. if (first)
  368. first = false;
  369. else
  370. state = iterate(state);
  371. hasResult = condition(state);
  372. if (hasResult)
  373. {
  374. result = resultSelector(state);
  375. time = timeSelector(state);
  376. }
  377. }
  378. catch (Exception exception)
  379. {
  380. observer.OnError(exception);
  381. return;
  382. }
  383. if (hasResult)
  384. self(time);
  385. else
  386. observer.OnCompleted();
  387. });
  388. });
  389. #endif
  390. }
  391. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector)
  392. {
  393. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations);
  394. }
  395. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
  396. {
  397. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  398. }
  399. private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
  400. {
  401. #if !NO_PERF
  402. return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  403. #else
  404. return new AnonymousObservable<TResult>(observer =>
  405. {
  406. var state = initialState;
  407. var first = true;
  408. var hasResult = false;
  409. var result = default(TResult);
  410. var time = default(DateTimeOffset);
  411. return scheduler.Schedule(scheduler.Now, self =>
  412. {
  413. if (hasResult)
  414. observer.OnNext(result);
  415. try
  416. {
  417. if (first)
  418. first = false;
  419. else
  420. state = iterate(state);
  421. hasResult = condition(state);
  422. if (hasResult)
  423. {
  424. result = resultSelector(state);
  425. time = timeSelector(state);
  426. }
  427. }
  428. catch (Exception exception)
  429. {
  430. observer.OnError(exception);
  431. return;
  432. }
  433. if (hasResult)
  434. self(time);
  435. else
  436. observer.OnCompleted();
  437. });
  438. });
  439. #endif
  440. }
  441. #endregion
  442. #region + Interval +
  443. public virtual IObservable<long> Interval(TimeSpan period)
  444. {
  445. return Timer_(period, period, SchedulerDefaults.TimeBasedOperations);
  446. }
  447. public virtual IObservable<long> Interval(TimeSpan period, IScheduler scheduler)
  448. {
  449. return Timer_(period, period, scheduler);
  450. }
  451. #endregion
  452. #region + Sample +
  453. public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval)
  454. {
  455. return Sample_<TSource>(source, interval, SchedulerDefaults.TimeBasedOperations);
  456. }
  457. public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)
  458. {
  459. return Sample_<TSource>(source, interval, scheduler);
  460. }
  461. private static IObservable<TSource> Sample_<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)
  462. {
  463. #if !NO_PERF
  464. return new Sample<TSource>(source, interval, scheduler);
  465. #else
  466. var sampler = Observable.Interval(interval, scheduler);
  467. return Sample_<TSource, long>(source, sampler);
  468. #endif
  469. }
  470. public virtual IObservable<TSource> Sample<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler)
  471. {
  472. return Sample_<TSource, TSample>(source, sampler);
  473. }
  474. private static IObservable<TSource> Sample_<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler)
  475. {
  476. #if !NO_PERF
  477. return new Sample<TSource, TSample>(source, sampler);
  478. #else
  479. return Combine(source, sampler, (IObserver<TSource> observer, IDisposable leftSubscription, IDisposable rightSubscription) =>
  480. {
  481. var value = default(Notification<TSource>);
  482. var atEnd = false;
  483. return new BinaryObserver<TSource, TSample>(
  484. newValue =>
  485. {
  486. switch (newValue.Kind)
  487. {
  488. case NotificationKind.OnNext:
  489. value = newValue;
  490. break;
  491. case NotificationKind.OnError:
  492. newValue.Accept(observer);
  493. break;
  494. case NotificationKind.OnCompleted:
  495. atEnd = true;
  496. break;
  497. }
  498. },
  499. _ =>
  500. {
  501. var myValue = value;
  502. value = null;
  503. if (myValue != null)
  504. myValue.Accept(observer);
  505. if (atEnd)
  506. observer.OnCompleted();
  507. });
  508. });
  509. #endif
  510. }
  511. #endregion
  512. #region + Skip +
  513. public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration)
  514. {
  515. return Skip_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  516. }
  517. public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  518. {
  519. return Skip_<TSource>(source, duration, scheduler);
  520. }
  521. private static IObservable<TSource> Skip_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  522. {
  523. #if !NO_PERF
  524. var skip = source as Skip<TSource>;
  525. if (skip != null && skip._scheduler == scheduler)
  526. return skip.Omega(duration);
  527. return new Skip<TSource>(source, duration, scheduler);
  528. #else
  529. return new AnonymousObservable<TSource>(observer =>
  530. {
  531. var open = false;
  532. var t = scheduler.Schedule(duration, () => open = true);
  533. var d = source.Subscribe(
  534. x =>
  535. {
  536. if (open)
  537. observer.OnNext(x);
  538. },
  539. observer.OnError,
  540. observer.OnCompleted
  541. );
  542. return new CompositeDisposable(t, d);
  543. });
  544. #endif
  545. }
  546. #endregion
  547. #region + SkipLast +
  548. public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration)
  549. {
  550. return SkipLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  551. }
  552. public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  553. {
  554. return SkipLast_<TSource>(source, duration, scheduler);
  555. }
  556. private static IObservable<TSource> SkipLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  557. {
  558. #if !NO_PERF
  559. return new SkipLast<TSource>(source, duration, scheduler);
  560. #else
  561. return new AnonymousObservable<TSource>(observer =>
  562. {
  563. var q = new Queue<System.Reactive.TimeInterval<TSource>>();
  564. var swp = scheduler.AsStopwatchProvider();
  565. var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch();
  566. return source.Subscribe(
  567. x =>
  568. {
  569. var now = sw.Elapsed;
  570. q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now));
  571. while (q.Count > 0 && now - q.Peek().Interval >= duration)
  572. observer.OnNext(q.Dequeue().Value);
  573. },
  574. observer.OnError,
  575. () =>
  576. {
  577. var now = sw.Elapsed;
  578. while (q.Count > 0 && now - q.Peek().Interval >= duration)
  579. observer.OnNext(q.Dequeue().Value);
  580. observer.OnCompleted();
  581. }
  582. );
  583. });
  584. #endif
  585. }
  586. #endregion
  587. #region + SkipUntil +
  588. public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime)
  589. {
  590. return SkipUntil_<TSource>(source, startTime, SchedulerDefaults.TimeBasedOperations);
  591. }
  592. public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler)
  593. {
  594. return SkipUntil_<TSource>(source, startTime, scheduler);
  595. }
  596. private static IObservable<TSource> SkipUntil_<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler)
  597. {
  598. #if !NO_PERF
  599. var skipUntil = source as SkipUntil<TSource>;
  600. if (skipUntil != null && skipUntil._scheduler == scheduler)
  601. return skipUntil.Omega(startTime);
  602. return new SkipUntil<TSource>(source, startTime, scheduler);
  603. #else
  604. return new AnonymousObservable<TSource>(observer =>
  605. {
  606. var open = false;
  607. var t = scheduler.Schedule(startTime, () => open = true);
  608. var d = source.Subscribe(
  609. x =>
  610. {
  611. if (open)
  612. observer.OnNext(x);
  613. },
  614. observer.OnError,
  615. observer.OnCompleted
  616. );
  617. return new CompositeDisposable(t, d);
  618. });
  619. #endif
  620. }
  621. #endregion
  622. #region + Take +
  623. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration)
  624. {
  625. return Take_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  626. }
  627. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  628. {
  629. return Take_<TSource>(source, duration, scheduler);
  630. }
  631. private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  632. {
  633. #if !NO_PERF
  634. var take = source as Take<TSource>;
  635. if (take != null && take._scheduler == scheduler)
  636. return take.Omega(duration);
  637. return new Take<TSource>(source, duration, scheduler);
  638. #else
  639. return new AnonymousObservable<TSource>(observer =>
  640. {
  641. var gate = new object();
  642. var t = scheduler.Schedule(duration, () =>
  643. {
  644. lock (gate)
  645. {
  646. observer.OnCompleted();
  647. }
  648. });
  649. var d = source.Synchronize(gate).Subscribe(observer);
  650. return new CompositeDisposable(t, d);
  651. });
  652. #endif
  653. }
  654. #endregion
  655. #region + TakeLast +
  656. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration)
  657. {
  658. return TakeLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations, SchedulerDefaults.Iteration);
  659. }
  660. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  661. {
  662. return TakeLast_<TSource>(source, duration, scheduler, SchedulerDefaults.Iteration);
  663. }
  664. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler)
  665. {
  666. return TakeLast_<TSource>(source, duration, timerScheduler, loopScheduler);
  667. }
  668. private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler)
  669. {
  670. #if !NO_PERF
  671. return new TakeLast<TSource>(source, duration, timerScheduler, loopScheduler);
  672. #else
  673. return new AnonymousObservable<TSource>(observer =>
  674. {
  675. var q = new Queue<System.Reactive.TimeInterval<TSource>>();
  676. var swp = timerScheduler.AsStopwatchProvider();
  677. var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch();
  678. var trim = new Action<TimeSpan>(now =>
  679. {
  680. while (q.Count > 0 && now - q.Peek().Interval >= duration)
  681. q.Dequeue();
  682. });
  683. var g = new CompositeDisposable();
  684. g.Add(source.Subscribe(
  685. x =>
  686. {
  687. var now = sw.Elapsed;
  688. q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now));
  689. trim(now);
  690. },
  691. observer.OnError,
  692. () =>
  693. {
  694. var now = sw.Elapsed;
  695. trim(now);
  696. g.Add(loopScheduler.Schedule(rec =>
  697. {
  698. if (q.Count > 0)
  699. {
  700. observer.OnNext(q.Dequeue().Value);
  701. rec();
  702. }
  703. else
  704. {
  705. observer.OnCompleted();
  706. }
  707. }));
  708. }
  709. ));
  710. return g;
  711. });
  712. #endif
  713. }
  714. public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration)
  715. {
  716. return TakeLastBuffer_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  717. }
  718. public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  719. {
  720. return TakeLastBuffer_<TSource>(source, duration, scheduler);
  721. }
  722. private static IObservable<IList<TSource>> TakeLastBuffer_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  723. {
  724. #if !NO_PERF
  725. return new TakeLastBuffer<TSource>(source, duration, scheduler);
  726. #else
  727. return new AnonymousObservable<IList<TSource>>(observer =>
  728. {
  729. var q = new Queue<System.Reactive.TimeInterval<TSource>>();
  730. var swp = scheduler.AsStopwatchProvider();
  731. var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch();
  732. return source.Subscribe(
  733. x =>
  734. {
  735. var now = sw.Elapsed;
  736. q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now));
  737. while (q.Count > 0 && now - q.Peek().Interval >= duration)
  738. q.Dequeue();
  739. },
  740. observer.OnError,
  741. () =>
  742. {
  743. var now = sw.Elapsed;
  744. var res = new List<TSource>();
  745. while (q.Count > 0)
  746. {
  747. var next = q.Dequeue();
  748. if (now - next.Interval <= duration)
  749. res.Add(next.Value);
  750. }
  751. observer.OnNext(res);
  752. observer.OnCompleted();
  753. }
  754. );
  755. });
  756. #endif
  757. }
  758. #endregion
  759. #region + TakeUntil +
  760. public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime)
  761. {
  762. return TakeUntil_<TSource>(source, endTime, SchedulerDefaults.TimeBasedOperations);
  763. }
  764. public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler)
  765. {
  766. return TakeUntil_<TSource>(source, endTime, scheduler);
  767. }
  768. private static IObservable<TSource> TakeUntil_<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler)
  769. {
  770. #if !NO_PERF
  771. var takeUntil = source as TakeUntil<TSource>;
  772. if (takeUntil != null && takeUntil._scheduler == scheduler)
  773. return takeUntil.Omega(endTime);
  774. return new TakeUntil<TSource>(source, endTime, scheduler);
  775. #else
  776. return new AnonymousObservable<TSource>(observer =>
  777. {
  778. var gate = new object();
  779. var t = scheduler.Schedule(endTime, () =>
  780. {
  781. lock (gate)
  782. {
  783. observer.OnCompleted();
  784. }
  785. });
  786. var d = source.Synchronize(gate).Subscribe(observer);
  787. return new CompositeDisposable(t, d);
  788. });
  789. #endif
  790. }
  791. #endregion
  792. #region + Throttle +
  793. public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  794. {
  795. return Throttle_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  796. }
  797. public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  798. {
  799. return Throttle_<TSource>(source, dueTime, scheduler);
  800. }
  801. private static IObservable<TSource> Throttle_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  802. {
  803. #if !NO_PERF
  804. return new Throttle<TSource>(source, dueTime, scheduler);
  805. #else
  806. return new AnonymousObservable<TSource>(observer =>
  807. {
  808. var gate = new object();
  809. var value = default(TSource);
  810. var hasValue = false;
  811. var cancelable = new SerialDisposable();
  812. var id = 0UL;
  813. var subscription = source.Subscribe(x =>
  814. {
  815. ulong currentid;
  816. lock (gate)
  817. {
  818. hasValue = true;
  819. value = x;
  820. id = unchecked(id + 1);
  821. currentid = id;
  822. }
  823. var d = new SingleAssignmentDisposable();
  824. cancelable.Disposable = d;
  825. d.Disposable = scheduler.Schedule(dueTime, () =>
  826. {
  827. lock (gate)
  828. {
  829. if (hasValue && id == currentid)
  830. observer.OnNext(value);
  831. hasValue = false;
  832. }
  833. });
  834. },
  835. exception =>
  836. {
  837. cancelable.Dispose();
  838. lock (gate)
  839. {
  840. observer.OnError(exception);
  841. hasValue = false;
  842. id = unchecked(id + 1);
  843. }
  844. },
  845. () =>
  846. {
  847. cancelable.Dispose();
  848. lock (gate)
  849. {
  850. if (hasValue)
  851. observer.OnNext(value);
  852. observer.OnCompleted();
  853. hasValue = false;
  854. id = unchecked(id + 1);
  855. }
  856. });
  857. return new CompositeDisposable(subscription, cancelable);
  858. });
  859. #endif
  860. }
  861. public virtual IObservable<TSource> Throttle<TSource, TThrottle>(IObservable<TSource> source, Func<TSource, IObservable<TThrottle>> throttleDurationSelector)
  862. {
  863. #if !NO_PERF
  864. return new Throttle<TSource, TThrottle>(source, throttleDurationSelector);
  865. #else
  866. return new AnonymousObservable<TSource>(observer =>
  867. {
  868. var gate = new object();
  869. var value = default(TSource);
  870. var hasValue = false;
  871. var cancelable = new SerialDisposable();
  872. var id = 0UL;
  873. var subscription = source.Subscribe(
  874. x =>
  875. {
  876. var throttle = default(IObservable<TThrottle>);
  877. try
  878. {
  879. throttle = throttleDurationSelector(x);
  880. }
  881. catch (Exception error)
  882. {
  883. lock (gate)
  884. observer.OnError(error);
  885. return;
  886. }
  887. ulong currentid;
  888. lock (gate)
  889. {
  890. hasValue = true;
  891. value = x;
  892. id = unchecked(id + 1);
  893. currentid = id;
  894. }
  895. var d = new SingleAssignmentDisposable();
  896. cancelable.Disposable = d;
  897. d.Disposable = throttle.Subscribe(
  898. _ =>
  899. {
  900. lock (gate)
  901. {
  902. if (hasValue && id == currentid)
  903. observer.OnNext(value);
  904. hasValue = false;
  905. d.Dispose();
  906. }
  907. },
  908. exception =>
  909. {
  910. lock (gate)
  911. {
  912. observer.OnError(exception);
  913. }
  914. },
  915. () =>
  916. {
  917. lock (gate)
  918. {
  919. if (hasValue && id == currentid)
  920. observer.OnNext(value);
  921. hasValue = false;
  922. d.Dispose();
  923. }
  924. }
  925. );
  926. },
  927. exception =>
  928. {
  929. cancelable.Dispose();
  930. lock (gate)
  931. {
  932. observer.OnError(exception);
  933. hasValue = false;
  934. id = unchecked(id + 1);
  935. }
  936. },
  937. () =>
  938. {
  939. cancelable.Dispose();
  940. lock (gate)
  941. {
  942. if (hasValue)
  943. observer.OnNext(value);
  944. observer.OnCompleted();
  945. hasValue = false;
  946. id = unchecked(id + 1);
  947. }
  948. });
  949. return new CompositeDisposable(subscription, cancelable);
  950. });
  951. #endif
  952. }
  953. #endregion
  954. #region + TimeInterval +
  955. public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source)
  956. {
  957. return TimeInterval_<TSource>(source, SchedulerDefaults.TimeBasedOperations);
  958. }
  959. public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source, IScheduler scheduler)
  960. {
  961. return TimeInterval_<TSource>(source, scheduler);
  962. }
  963. #if !NO_PERF
  964. private static IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval_<TSource>(IObservable<TSource> source, IScheduler scheduler)
  965. {
  966. return new TimeInterval<TSource>(source, scheduler);
  967. }
  968. #else
  969. private IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval_<TSource>(IObservable<TSource> source, IScheduler scheduler)
  970. {
  971. return Defer(() =>
  972. {
  973. var last = scheduler.Now;
  974. return source.Select(x =>
  975. {
  976. var now = scheduler.Now;
  977. var span = now.Subtract(last);
  978. last = now;
  979. return new System.Reactive.TimeInterval<TSource>(x, span);
  980. });
  981. });
  982. }
  983. #endif
  984. #endregion
  985. #region + Timeout +
  986. #region TimeSpan
  987. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  988. {
  989. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations);
  990. }
  991. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  992. {
  993. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler);
  994. }
  995. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other)
  996. {
  997. return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations);
  998. }
  999. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)
  1000. {
  1001. return Timeout_<TSource>(source, dueTime, other, scheduler);
  1002. }
  1003. private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)
  1004. {
  1005. #if !NO_PERF
  1006. return new Timeout<TSource>(source, dueTime, other, scheduler);
  1007. #else
  1008. return new AnonymousObservable<TSource>(observer =>
  1009. {
  1010. var subscription = new SerialDisposable();
  1011. var timer = new SerialDisposable();
  1012. var original = new SingleAssignmentDisposable();
  1013. subscription.Disposable = original;
  1014. var gate = new object();
  1015. var id = 0UL;
  1016. var switched = false;
  1017. Action createTimer = () =>
  1018. {
  1019. var myid = id;
  1020. timer.Disposable = scheduler.Schedule(dueTime, () =>
  1021. {
  1022. var timerWins = false;
  1023. lock (gate)
  1024. {
  1025. switched = (id == myid);
  1026. timerWins = switched;
  1027. }
  1028. if (timerWins)
  1029. subscription.Disposable = other.Subscribe(observer);
  1030. });
  1031. };
  1032. createTimer();
  1033. original.Disposable = source.Subscribe(
  1034. x =>
  1035. {
  1036. var onNextWins = false;
  1037. lock (gate)
  1038. {
  1039. onNextWins = !switched;
  1040. if (onNextWins)
  1041. {
  1042. id = unchecked(id + 1);
  1043. }
  1044. }
  1045. if (onNextWins)
  1046. {
  1047. observer.OnNext(x);
  1048. createTimer();
  1049. }
  1050. },
  1051. exception =>
  1052. {
  1053. var onErrorWins = false;
  1054. lock (gate)
  1055. {
  1056. onErrorWins = !switched;
  1057. if (onErrorWins)
  1058. {
  1059. id = unchecked(id + 1);
  1060. }
  1061. }
  1062. if (onErrorWins)
  1063. observer.OnError(exception);
  1064. },
  1065. () =>
  1066. {
  1067. var onCompletedWins = false;
  1068. lock (gate)
  1069. {
  1070. onCompletedWins = !switched;
  1071. if (onCompletedWins)
  1072. {
  1073. id = unchecked(id + 1);
  1074. }
  1075. }
  1076. if (onCompletedWins)
  1077. observer.OnCompleted();
  1078. });
  1079. return new CompositeDisposable(subscription, timer);
  1080. });
  1081. #endif
  1082. }
  1083. #endregion
  1084. #region DateTimeOffset
  1085. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)
  1086. {
  1087. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations);
  1088. }
  1089. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  1090. {
  1091. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler);
  1092. }
  1093. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other)
  1094. {
  1095. return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations);
  1096. }
  1097. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)
  1098. {
  1099. return Timeout_<TSource>(source, dueTime, other, scheduler);
  1100. }
  1101. private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)
  1102. {
  1103. #if !NO_PERF
  1104. return new Timeout<TSource>(source, dueTime, other, scheduler);
  1105. #else
  1106. return new AnonymousObservable<TSource>(observer =>
  1107. {
  1108. var subscription = new SerialDisposable();
  1109. var original = new SingleAssignmentDisposable();
  1110. subscription.Disposable = original;
  1111. var gate = new object();
  1112. var switched = false;
  1113. var timer = scheduler.Schedule(dueTime, () =>
  1114. {
  1115. var timerWins = false;
  1116. lock (gate)
  1117. {
  1118. timerWins = !switched;
  1119. switched = true;
  1120. }
  1121. if (timerWins)
  1122. subscription.Disposable = other.Subscribe(observer);
  1123. });
  1124. original.Disposable = source.Subscribe(
  1125. x =>
  1126. {
  1127. lock (gate)
  1128. {
  1129. if (!switched)
  1130. observer.OnNext(x);
  1131. }
  1132. },
  1133. exception =>
  1134. {
  1135. var onErrorWins = false;
  1136. lock (gate)
  1137. {
  1138. onErrorWins = !switched;
  1139. switched = true;
  1140. }
  1141. if (onErrorWins)
  1142. observer.OnError(exception);
  1143. },
  1144. () =>
  1145. {
  1146. var onCompletedWins = false;
  1147. lock (gate)
  1148. {
  1149. onCompletedWins = !switched;
  1150. switched = true;
  1151. }
  1152. if (onCompletedWins)
  1153. observer.OnCompleted();
  1154. });
  1155. return new CompositeDisposable(subscription, timer);
  1156. });
  1157. #endif
  1158. }
  1159. #endregion
  1160. #region Duration selector
  1161. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector)
  1162. {
  1163. return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException()));
  1164. }
  1165. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)
  1166. {
  1167. return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, other);
  1168. }
  1169. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector)
  1170. {
  1171. return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException()));
  1172. }
  1173. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)
  1174. {
  1175. return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other);
  1176. }
  1177. private static IObservable<TSource> Timeout_<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)
  1178. {
  1179. #if !NO_PERF
  1180. return new Timeout<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other);
  1181. #else
  1182. return new AnonymousObservable<TSource>(observer =>
  1183. {
  1184. var subscription = new SerialDisposable();
  1185. var timer = new SerialDisposable();
  1186. var original = new SingleAssignmentDisposable();
  1187. subscription.Disposable = original;
  1188. var gate = new object();
  1189. var id = 0UL;
  1190. var switched = false;
  1191. Action<IObservable<TTimeout>> setTimer = timeout =>
  1192. {
  1193. var myid = id;
  1194. Func<bool> timerWins = () =>
  1195. {
  1196. var res = false;
  1197. lock (gate)
  1198. {
  1199. switched = (id == myid);
  1200. res = switched;
  1201. }
  1202. return res;
  1203. };
  1204. var d = new SingleAssignmentDisposable();
  1205. timer.Disposable = d;
  1206. d.Disposable = timeout.Subscribe(
  1207. _ =>
  1208. {
  1209. if (timerWins())
  1210. subscription.Disposable = other.Subscribe(observer);
  1211. d.Dispose();
  1212. },
  1213. error =>
  1214. {
  1215. if (timerWins())
  1216. observer.OnError(error);
  1217. },
  1218. () =>
  1219. {
  1220. if (timerWins())
  1221. subscription.Disposable = other.Subscribe(observer);
  1222. }
  1223. );
  1224. };
  1225. setTimer(firstTimeout);
  1226. Func<bool> observerWins = () =>
  1227. {
  1228. var res = false;
  1229. lock (gate)
  1230. {
  1231. res = !switched;
  1232. if (res)
  1233. {
  1234. id = unchecked(id + 1);
  1235. }
  1236. }
  1237. return res;
  1238. };
  1239. original.Disposable = source.Subscribe(
  1240. x =>
  1241. {
  1242. if (observerWins())
  1243. {
  1244. observer.OnNext(x);
  1245. var timeout = default(IObservable<TTimeout>);
  1246. try
  1247. {
  1248. timeout = timeoutDurationSelector(x);
  1249. }
  1250. catch (Exception error)
  1251. {
  1252. observer.OnError(error);
  1253. return;
  1254. }
  1255. setTimer(timeout);
  1256. }
  1257. },
  1258. exception =>
  1259. {
  1260. if (observerWins())
  1261. observer.OnError(exception);
  1262. },
  1263. () =>
  1264. {
  1265. if (observerWins())
  1266. observer.OnCompleted();
  1267. }
  1268. );
  1269. return new CompositeDisposable(subscription, timer);
  1270. });
  1271. #endif
  1272. }
  1273. #endregion
  1274. #endregion
  1275. #region + Timer +
  1276. public virtual IObservable<long> Timer(TimeSpan dueTime)
  1277. {
  1278. return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
  1279. }
  1280. public virtual IObservable<long> Timer(DateTimeOffset dueTime)
  1281. {
  1282. return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
  1283. }
  1284. public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period)
  1285. {
  1286. return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations);
  1287. }
  1288. public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period)
  1289. {
  1290. return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations);
  1291. }
  1292. public virtual IObservable<long> Timer(TimeSpan dueTime, IScheduler scheduler)
  1293. {
  1294. return Timer_(dueTime, scheduler);
  1295. }
  1296. public virtual IObservable<long> Timer(DateTimeOffset dueTime, IScheduler scheduler)
  1297. {
  1298. return Timer_(dueTime, scheduler);
  1299. }
  1300. public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
  1301. {
  1302. return Timer_(dueTime, period, scheduler);
  1303. }
  1304. public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)
  1305. {
  1306. return Timer_(dueTime, period, scheduler);
  1307. }
  1308. private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler)
  1309. {
  1310. #if !NO_PERF
  1311. return new Timer(dueTime, null, scheduler);
  1312. #else
  1313. var d = Normalize(dueTime);
  1314. return new AnonymousObservable<long>(observer =>
  1315. scheduler.Schedule(d, () =>
  1316. {
  1317. observer.OnNext(0);
  1318. observer.OnCompleted();
  1319. }));
  1320. #endif
  1321. }
  1322. #if !NO_PERF
  1323. private static IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
  1324. {
  1325. return new Timer(dueTime, period, scheduler);
  1326. }
  1327. #else
  1328. private IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
  1329. {
  1330. var p = Normalize(period);
  1331. return Defer(() => Timer(scheduler.Now + dueTime, p, scheduler));
  1332. }
  1333. #endif
  1334. private static IObservable<long> Timer_(DateTimeOffset dueTime, IScheduler scheduler)
  1335. {
  1336. #if !NO_PERF
  1337. return new Timer(dueTime, null, scheduler);
  1338. #else
  1339. return new AnonymousObservable<long>(observer =>
  1340. scheduler.Schedule(dueTime, () =>
  1341. {
  1342. observer.OnNext(0);
  1343. observer.OnCompleted();
  1344. }));
  1345. #endif
  1346. }
  1347. private static IObservable<long> Timer_(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)
  1348. {
  1349. #if !NO_PERF
  1350. return new Timer(dueTime, period, scheduler);
  1351. #else
  1352. var p = Normalize(period);
  1353. return new AnonymousObservable<long>(observer =>
  1354. {
  1355. var d = dueTime;
  1356. var count = 0L;
  1357. return scheduler.Schedule(d, self =>
  1358. {
  1359. if (p > TimeSpan.Zero)
  1360. {
  1361. var now = scheduler.Now;
  1362. d = d + p;
  1363. if (d <= now)
  1364. d = now + p;
  1365. }
  1366. observer.OnNext(count);
  1367. count = unchecked(count + 1);
  1368. self(d);
  1369. });
  1370. });
  1371. #endif
  1372. }
  1373. #endregion
  1374. #region + Timestamp +
  1375. public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source)
  1376. {
  1377. return Timestamp_<TSource>(source, SchedulerDefaults.TimeBasedOperations);
  1378. }
  1379. public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source, IScheduler scheduler)
  1380. {
  1381. return Timestamp_<TSource>(source, scheduler);
  1382. }
  1383. private static IObservable<Timestamped<TSource>> Timestamp_<TSource>(IObservable<TSource> source, IScheduler scheduler)
  1384. {
  1385. #if !NO_PERF
  1386. return new Timestamp<TSource>(source, scheduler);
  1387. #else
  1388. return source.Select(x => new Timestamped<TSource>(x, scheduler.Now));
  1389. #endif
  1390. }
  1391. #endregion
  1392. #region + Window +
  1393. #region TimeSpan only
  1394. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan)
  1395. {
  1396. return Window_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations);
  1397. }
  1398. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
  1399. {
  1400. return Window_<TSource>(source, timeSpan, timeSpan, scheduler);
  1401. }
  1402. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  1403. {
  1404. return Window_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations);
  1405. }
  1406. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  1407. {
  1408. return Window_<TSource>(source, timeSpan, timeShift, scheduler);
  1409. }
  1410. private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  1411. {
  1412. #if !NO_PERF
  1413. return new Window<TSource>(source, timeSpan, timeShift, scheduler);
  1414. #else
  1415. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1416. {
  1417. var totalTime = TimeSpan.Zero;
  1418. var nextShift = timeShift;
  1419. var nextSpan = timeSpan;
  1420. var gate = new object();
  1421. var q = new Queue<ISubject<TSource>>();
  1422. var timerD = new SerialDisposable();
  1423. var groupDisposable = new CompositeDisposable(2) { timerD };
  1424. var refCountDisposable = new RefCountDisposable(groupDisposable);
  1425. var createTimer = default(Action);
  1426. createTimer = () =>
  1427. {
  1428. var m = new SingleAssignmentDisposable();
  1429. timerD.Disposable = m;
  1430. var isSpan = false;
  1431. var isShift = false;
  1432. if (nextSpan == nextShift)
  1433. {
  1434. isSpan = true;
  1435. isShift = true;
  1436. }
  1437. else if (nextSpan < nextShift)
  1438. isSpan = true;
  1439. else
  1440. isShift = true;
  1441. var newTotalTime = isSpan ? nextSpan : nextShift;
  1442. var ts = newTotalTime - totalTime;
  1443. totalTime = newTotalTime;
  1444. if (isSpan)
  1445. nextSpan += timeShift;
  1446. if (isShift)
  1447. nextShift += timeShift;
  1448. m.Disposable = scheduler.Schedule(ts, () =>
  1449. {
  1450. lock (gate)
  1451. {
  1452. if (isShift)
  1453. {
  1454. var s = new Subject<TSource>();
  1455. q.Enqueue(s);
  1456. observer.OnNext(s.AddRef(refCountDisposable));
  1457. }
  1458. if (isSpan)
  1459. {
  1460. var s = q.Dequeue();
  1461. s.OnCompleted();
  1462. }
  1463. }
  1464. createTimer();
  1465. });
  1466. };
  1467. q.Enqueue(new Subject<TSource>());
  1468. observer.OnNext(q.Peek().AddRef(refCountDisposable));
  1469. createTimer();
  1470. groupDisposable.Add(source.Subscribe(
  1471. x =>
  1472. {
  1473. lock (gate)
  1474. {
  1475. foreach (var s in q)
  1476. s.OnNext(x);
  1477. }
  1478. },
  1479. exception =>
  1480. {
  1481. lock (gate)
  1482. {
  1483. foreach (var s in q)
  1484. s.OnError(exception);
  1485. observer.OnError(exception);
  1486. }
  1487. },
  1488. () =>
  1489. {
  1490. lock (gate)
  1491. {
  1492. foreach (var s in q)
  1493. s.OnCompleted();
  1494. observer.OnCompleted();
  1495. }
  1496. }
  1497. ));
  1498. return refCountDisposable;
  1499. });
  1500. #endif
  1501. }
  1502. #endregion
  1503. #region TimeSpan + int
  1504. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count)
  1505. {
  1506. return Window_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations);
  1507. }
  1508. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  1509. {
  1510. return Window_<TSource>(source, timeSpan, count, scheduler);
  1511. }
  1512. private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  1513. {
  1514. #if !NO_PERF
  1515. return new Window<TSource>(source, timeSpan, count, scheduler);
  1516. #else
  1517. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1518. {
  1519. var gate = new object();
  1520. var s = default(ISubject<TSource>);
  1521. var n = 0;
  1522. var windowId = 0;
  1523. var timerD = new SerialDisposable();
  1524. var groupDisposable = new CompositeDisposable(2) { timerD };
  1525. var refCountDisposable = new RefCountDisposable(groupDisposable);
  1526. var createTimer = default(Action<int>);
  1527. createTimer = id =>
  1528. {
  1529. var m = new SingleAssignmentDisposable();
  1530. timerD.Disposable = m;
  1531. m.Disposable = scheduler.Schedule(timeSpan, () =>
  1532. {
  1533. var newId = 0;
  1534. lock (gate)
  1535. {
  1536. if (id != windowId)
  1537. return;
  1538. n = 0;
  1539. newId = ++windowId;
  1540. s.OnCompleted();
  1541. s = new Subject<TSource>();
  1542. observer.OnNext(s.AddRef(refCountDisposable));
  1543. }
  1544. createTimer(newId);
  1545. });
  1546. };
  1547. s = new Subject<TSource>();
  1548. observer.OnNext(s.AddRef(refCountDisposable));
  1549. createTimer(0);
  1550. groupDisposable.Add(source.Subscribe(
  1551. x =>
  1552. {
  1553. var newWindow = false;
  1554. var newId = 0;
  1555. lock (gate)
  1556. {
  1557. s.OnNext(x);
  1558. n++;
  1559. if (n == count)
  1560. {
  1561. newWindow = true;
  1562. n = 0;
  1563. newId = ++windowId;
  1564. s.OnCompleted();
  1565. s = new Subject<TSource>();
  1566. observer.OnNext(s.AddRef(refCountDisposable));
  1567. }
  1568. }
  1569. if (newWindow)
  1570. createTimer(newId);
  1571. },
  1572. exception =>
  1573. {
  1574. lock (gate)
  1575. {
  1576. s.OnError(exception);
  1577. observer.OnError(exception);
  1578. }
  1579. },
  1580. () =>
  1581. {
  1582. lock (gate)
  1583. {
  1584. s.OnCompleted();
  1585. observer.OnCompleted();
  1586. }
  1587. }
  1588. ));
  1589. return refCountDisposable;
  1590. });
  1591. #endif
  1592. }
  1593. #endregion
  1594. #endregion
  1595. #region |> Helpers <|
  1596. #if NO_PERF
  1597. private static TimeSpan Normalize(TimeSpan timeSpan)
  1598. {
  1599. if (timeSpan.CompareTo(TimeSpan.Zero) < 0)
  1600. return TimeSpan.Zero;
  1601. return timeSpan;
  1602. }
  1603. #endif
  1604. #endregion
  1605. }
  1606. }