QueryLanguage.Time.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. namespace System.Reactive.Linq
  9. {
  10. using ObservableImpl;
  11. internal partial class QueryLanguage
  12. {
  13. #region + Buffer +
  14. #region TimeSpan only
  15. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan)
  16. {
  17. return Buffer_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations);
  18. }
  19. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
  20. {
  21. return Buffer_<TSource>(source, timeSpan, timeSpan, scheduler);
  22. }
  23. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  24. {
  25. return Buffer_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations);
  26. }
  27. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  28. {
  29. return Buffer_<TSource>(source, timeSpan, timeShift, scheduler);
  30. }
  31. private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  32. {
  33. return new Buffer<TSource>(source, timeSpan, timeShift, scheduler);
  34. }
  35. #endregion
  36. #region TimeSpan + int
  37. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count)
  38. {
  39. return Buffer_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations);
  40. }
  41. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  42. {
  43. return Buffer_<TSource>(source, timeSpan, count, scheduler);
  44. }
  45. private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  46. {
  47. return new Buffer<TSource>(source, timeSpan, count, scheduler);
  48. }
  49. #endregion
  50. #endregion
  51. #region + Delay +
  52. #region TimeSpan
  53. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  54. {
  55. return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  56. }
  57. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  58. {
  59. return Delay_<TSource>(source, dueTime, scheduler);
  60. }
  61. private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  62. {
  63. return new Delay<TSource>(source, dueTime, scheduler);
  64. }
  65. #endregion
  66. #region DateTimeOffset
  67. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)
  68. {
  69. return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  70. }
  71. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  72. {
  73. return Delay_<TSource>(source, dueTime, scheduler);
  74. }
  75. private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  76. {
  77. return new Delay<TSource>(source, dueTime, scheduler);
  78. }
  79. #endregion
  80. #region Duration selector
  81. public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delayDurationSelector)
  82. {
  83. return Delay_<TSource, TDelay>(source, null, delayDurationSelector);
  84. }
  85. public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)
  86. {
  87. return Delay_<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);
  88. }
  89. private static IObservable<TSource> Delay_<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)
  90. {
  91. return new Delay<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);
  92. }
  93. #endregion
  94. #endregion
  95. #region + DelaySubscription +
  96. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  97. {
  98. return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  99. }
  100. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  101. {
  102. return DelaySubscription_<TSource>(source, dueTime, scheduler);
  103. }
  104. private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  105. {
  106. return new DelaySubscription<TSource>.Relative(source, dueTime, scheduler);
  107. }
  108. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)
  109. {
  110. return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  111. }
  112. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  113. {
  114. return DelaySubscription_<TSource>(source, dueTime, scheduler);
  115. }
  116. private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  117. {
  118. return new DelaySubscription<TSource>.Absolute(source, dueTime, scheduler);
  119. }
  120. #endregion
  121. #region + Generate +
  122. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector)
  123. {
  124. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations);
  125. }
  126. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
  127. {
  128. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  129. }
  130. private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
  131. {
  132. return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  133. }
  134. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector)
  135. {
  136. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations);
  137. }
  138. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
  139. {
  140. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  141. }
  142. private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
  143. {
  144. return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  145. }
  146. #endregion
  147. #region + Interval +
  148. public virtual IObservable<long> Interval(TimeSpan period)
  149. {
  150. return Timer_(period, period, SchedulerDefaults.TimeBasedOperations);
  151. }
  152. public virtual IObservable<long> Interval(TimeSpan period, IScheduler scheduler)
  153. {
  154. return Timer_(period, period, scheduler);
  155. }
  156. #endregion
  157. #region + Sample +
  158. public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval)
  159. {
  160. return Sample_<TSource>(source, interval, SchedulerDefaults.TimeBasedOperations);
  161. }
  162. public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)
  163. {
  164. return Sample_<TSource>(source, interval, scheduler);
  165. }
  166. private static IObservable<TSource> Sample_<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)
  167. {
  168. return new Sample<TSource>(source, interval, scheduler);
  169. }
  170. public virtual IObservable<TSource> Sample<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler)
  171. {
  172. return Sample_<TSource, TSample>(source, sampler);
  173. }
  174. private static IObservable<TSource> Sample_<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler)
  175. {
  176. return new Sample<TSource, TSample>(source, sampler);
  177. }
  178. #endregion
  179. #region + Skip +
  180. public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration)
  181. {
  182. return Skip_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  183. }
  184. public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  185. {
  186. return Skip_<TSource>(source, duration, scheduler);
  187. }
  188. private static IObservable<TSource> Skip_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  189. {
  190. var skip = source as Skip<TSource>.Time;
  191. if (skip != null && skip._scheduler == scheduler)
  192. return skip.Combine(duration);
  193. return new Skip<TSource>.Time(source, duration, scheduler);
  194. }
  195. #endregion
  196. #region + SkipLast +
  197. public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration)
  198. {
  199. return SkipLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  200. }
  201. public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  202. {
  203. return SkipLast_<TSource>(source, duration, scheduler);
  204. }
  205. private static IObservable<TSource> SkipLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  206. {
  207. return new SkipLast<TSource>.Time(source, duration, scheduler);
  208. }
  209. #endregion
  210. #region + SkipUntil +
  211. public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime)
  212. {
  213. return SkipUntil_<TSource>(source, startTime, SchedulerDefaults.TimeBasedOperations);
  214. }
  215. public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler)
  216. {
  217. return SkipUntil_<TSource>(source, startTime, scheduler);
  218. }
  219. private static IObservable<TSource> SkipUntil_<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler)
  220. {
  221. var skipUntil = source as SkipUntil<TSource>;
  222. if (skipUntil != null && skipUntil._scheduler == scheduler)
  223. return skipUntil.Combine(startTime);
  224. return new SkipUntil<TSource>(source, startTime, scheduler);
  225. }
  226. #endregion
  227. #region + Take +
  228. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration)
  229. {
  230. return Take_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  231. }
  232. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  233. {
  234. return Take_<TSource>(source, duration, scheduler);
  235. }
  236. private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  237. {
  238. var take = source as Take<TSource>.Time;
  239. if (take != null && take._scheduler == scheduler)
  240. return take.Combine(duration);
  241. return new Take<TSource>.Time(source, duration, scheduler);
  242. }
  243. #endregion
  244. #region + TakeLast +
  245. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration)
  246. {
  247. return TakeLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations, SchedulerDefaults.Iteration);
  248. }
  249. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  250. {
  251. return TakeLast_<TSource>(source, duration, scheduler, SchedulerDefaults.Iteration);
  252. }
  253. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler)
  254. {
  255. return TakeLast_<TSource>(source, duration, timerScheduler, loopScheduler);
  256. }
  257. private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler)
  258. {
  259. return new TakeLast<TSource>.Time(source, duration, timerScheduler, loopScheduler);
  260. }
  261. public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration)
  262. {
  263. return TakeLastBuffer_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  264. }
  265. public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  266. {
  267. return TakeLastBuffer_<TSource>(source, duration, scheduler);
  268. }
  269. private static IObservable<IList<TSource>> TakeLastBuffer_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  270. {
  271. return new TakeLastBuffer<TSource>.Time(source, duration, scheduler);
  272. }
  273. #endregion
  274. #region + TakeUntil +
  275. public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime)
  276. {
  277. return TakeUntil_<TSource>(source, endTime, SchedulerDefaults.TimeBasedOperations);
  278. }
  279. public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler)
  280. {
  281. return TakeUntil_<TSource>(source, endTime, scheduler);
  282. }
  283. private static IObservable<TSource> TakeUntil_<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler)
  284. {
  285. var takeUntil = source as TakeUntil<TSource>;
  286. if (takeUntil != null && takeUntil._scheduler == scheduler)
  287. return takeUntil.Combine(endTime);
  288. return new TakeUntil<TSource>(source, endTime, scheduler);
  289. }
  290. #endregion
  291. #region + Throttle +
  292. public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  293. {
  294. return Throttle_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  295. }
  296. public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  297. {
  298. return Throttle_<TSource>(source, dueTime, scheduler);
  299. }
  300. private static IObservable<TSource> Throttle_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  301. {
  302. return new Throttle<TSource>(source, dueTime, scheduler);
  303. }
  304. public virtual IObservable<TSource> Throttle<TSource, TThrottle>(IObservable<TSource> source, Func<TSource, IObservable<TThrottle>> throttleDurationSelector)
  305. {
  306. return new Throttle<TSource, TThrottle>(source, throttleDurationSelector);
  307. }
  308. #endregion
  309. #region + TimeInterval +
  310. public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source)
  311. {
  312. return TimeInterval_<TSource>(source, SchedulerDefaults.TimeBasedOperations);
  313. }
  314. public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source, IScheduler scheduler)
  315. {
  316. return TimeInterval_<TSource>(source, scheduler);
  317. }
  318. private static IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval_<TSource>(IObservable<TSource> source, IScheduler scheduler)
  319. {
  320. return new TimeInterval<TSource>(source, scheduler);
  321. }
  322. #endregion
  323. #region + Timeout +
  324. #region TimeSpan
  325. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  326. {
  327. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations);
  328. }
  329. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  330. {
  331. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler);
  332. }
  333. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other)
  334. {
  335. return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations);
  336. }
  337. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)
  338. {
  339. return Timeout_<TSource>(source, dueTime, other, scheduler);
  340. }
  341. private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)
  342. {
  343. return new Timeout<TSource>.Relative(source, dueTime, other, scheduler);
  344. }
  345. #endregion
  346. #region DateTimeOffset
  347. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)
  348. {
  349. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations);
  350. }
  351. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  352. {
  353. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler);
  354. }
  355. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other)
  356. {
  357. return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations);
  358. }
  359. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)
  360. {
  361. return Timeout_<TSource>(source, dueTime, other, scheduler);
  362. }
  363. private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)
  364. {
  365. return new Timeout<TSource>.Absolute(source, dueTime, other, scheduler);
  366. }
  367. #endregion
  368. #region Duration selector
  369. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector)
  370. {
  371. return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException()));
  372. }
  373. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)
  374. {
  375. return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, other);
  376. }
  377. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector)
  378. {
  379. return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException()));
  380. }
  381. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)
  382. {
  383. return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other);
  384. }
  385. private static IObservable<TSource> Timeout_<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)
  386. {
  387. return new Timeout<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other);
  388. }
  389. #endregion
  390. #endregion
  391. #region + Timer +
  392. public virtual IObservable<long> Timer(TimeSpan dueTime)
  393. {
  394. return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
  395. }
  396. public virtual IObservable<long> Timer(DateTimeOffset dueTime)
  397. {
  398. return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
  399. }
  400. public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period)
  401. {
  402. return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations);
  403. }
  404. public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period)
  405. {
  406. return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations);
  407. }
  408. public virtual IObservable<long> Timer(TimeSpan dueTime, IScheduler scheduler)
  409. {
  410. return Timer_(dueTime, scheduler);
  411. }
  412. public virtual IObservable<long> Timer(DateTimeOffset dueTime, IScheduler scheduler)
  413. {
  414. return Timer_(dueTime, scheduler);
  415. }
  416. public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
  417. {
  418. return Timer_(dueTime, period, scheduler);
  419. }
  420. public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)
  421. {
  422. return Timer_(dueTime, period, scheduler);
  423. }
  424. private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler)
  425. {
  426. return new Timer.Single.Relative(dueTime, scheduler);
  427. }
  428. private static IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
  429. {
  430. return new Timer.Periodic.Relative(dueTime, period, scheduler);
  431. }
  432. private static IObservable<long> Timer_(DateTimeOffset dueTime, IScheduler scheduler)
  433. {
  434. return new Timer.Single.Absolute(dueTime, scheduler);
  435. }
  436. private static IObservable<long> Timer_(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)
  437. {
  438. return new Timer.Periodic.Absolute(dueTime, period, scheduler);
  439. }
  440. #endregion
  441. #region + Timestamp +
  442. public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source)
  443. {
  444. return Timestamp_<TSource>(source, SchedulerDefaults.TimeBasedOperations);
  445. }
  446. public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source, IScheduler scheduler)
  447. {
  448. return Timestamp_<TSource>(source, scheduler);
  449. }
  450. private static IObservable<Timestamped<TSource>> Timestamp_<TSource>(IObservable<TSource> source, IScheduler scheduler)
  451. {
  452. return new Timestamp<TSource>(source, scheduler);
  453. }
  454. #endregion
  455. #region + Window +
  456. #region TimeSpan only
  457. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan)
  458. {
  459. return Window_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations);
  460. }
  461. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
  462. {
  463. return Window_<TSource>(source, timeSpan, timeSpan, scheduler);
  464. }
  465. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  466. {
  467. return Window_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations);
  468. }
  469. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  470. {
  471. return Window_<TSource>(source, timeSpan, timeShift, scheduler);
  472. }
  473. private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  474. {
  475. return new Window<TSource>(source, timeSpan, timeShift, scheduler);
  476. }
  477. #endregion
  478. #region TimeSpan + int
  479. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count)
  480. {
  481. return Window_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations);
  482. }
  483. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  484. {
  485. return Window_<TSource>(source, timeSpan, count, scheduler);
  486. }
  487. private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  488. {
  489. return new Window<TSource>(source, timeSpan, count, scheduler);
  490. }
  491. #endregion
  492. #endregion
  493. }
  494. }