QueryLanguage.Time.cs 68 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. namespace System.Reactive.Linq
  9. {
  10. #if !NO_PERF
  11. using ObservableImpl;
  12. #endif
  13. internal partial class QueryLanguage
  14. {
  15. #region + Buffer +
  16. #region TimeSpan only
  17. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan)
  18. {
  19. return Buffer_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations);
  20. }
  21. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
  22. {
  23. return Buffer_<TSource>(source, timeSpan, timeSpan, scheduler);
  24. }
  25. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  26. {
  27. return Buffer_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations);
  28. }
  29. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  30. {
  31. return Buffer_<TSource>(source, timeSpan, timeShift, scheduler);
  32. }
  33. private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  34. {
  35. #if !NO_PERF
  36. return new Buffer<TSource>(source, timeSpan, timeShift, scheduler);
  37. #else
  38. return source.Window(timeSpan, timeShift, scheduler).SelectMany(Observable.ToList);
  39. #endif
  40. }
  41. #endregion
  42. #region TimeSpan + int
  43. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count)
  44. {
  45. return Buffer_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations);
  46. }
  47. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  48. {
  49. return Buffer_<TSource>(source, timeSpan, count, scheduler);
  50. }
  51. private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  52. {
  53. #if !NO_PERF
  54. return new Buffer<TSource>(source, timeSpan, count, scheduler);
  55. #else
  56. return source.Window(timeSpan, count, scheduler).SelectMany(Observable.ToList);
  57. #endif
  58. }
  59. #endregion
  60. #endregion
  61. #region + Delay +
  62. #region TimeSpan
  63. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  64. {
  65. return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  66. }
  67. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  68. {
  69. return Delay_<TSource>(source, dueTime, scheduler);
  70. }
  71. private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  72. {
  73. #if !NO_PERF
  74. return new Delay<TSource>(source, dueTime, scheduler);
  75. #else
  76. return new AnonymousObservable<TSource>(observer =>
  77. {
  78. var gate = new object();
  79. var q = new Queue<Timestamped<Notification<TSource>>>();
  80. var active = false;
  81. var running = false;
  82. var cancelable = new SerialDisposable();
  83. var exception = default(Exception);
  84. var subscription = source.Materialize().Timestamp(scheduler).Subscribe(notification =>
  85. {
  86. var shouldRun = false;
  87. lock (gate)
  88. {
  89. if (notification.Value.Kind == NotificationKind.OnError)
  90. {
  91. q.Clear();
  92. q.Enqueue(notification);
  93. exception = notification.Value.Exception;
  94. shouldRun = !running;
  95. }
  96. else
  97. {
  98. q.Enqueue(new Timestamped<Notification<TSource>>(notification.Value, notification.Timestamp.Add(dueTime)));
  99. shouldRun = !active;
  100. active = true;
  101. }
  102. }
  103. if (shouldRun)
  104. {
  105. if (exception != null)
  106. observer.OnError(exception);
  107. else
  108. {
  109. var d = new SingleAssignmentDisposable();
  110. cancelable.Disposable = d;
  111. d.Disposable = scheduler.Schedule(dueTime, self =>
  112. {
  113. lock (gate)
  114. {
  115. if (exception != null)
  116. return;
  117. running = true;
  118. }
  119. Notification<TSource> result;
  120. do
  121. {
  122. result = null;
  123. lock (gate)
  124. {
  125. if (q.Count > 0 && q.Peek().Timestamp.CompareTo(scheduler.Now) <= 0)
  126. result = q.Dequeue().Value;
  127. }
  128. if (result != null)
  129. result.Accept(observer);
  130. } while (result != null);
  131. var shouldRecurse = false;
  132. var recurseDueTime = TimeSpan.Zero;
  133. var e = default(Exception);
  134. lock (gate)
  135. {
  136. if (q.Count > 0)
  137. {
  138. shouldRecurse = true;
  139. recurseDueTime = TimeSpan.FromTicks(Math.Max(0, q.Peek().Timestamp.Subtract(scheduler.Now).Ticks));
  140. }
  141. else
  142. active = false;
  143. e = exception;
  144. running = false;
  145. }
  146. if (e != null)
  147. observer.OnError(e);
  148. else if (shouldRecurse)
  149. self(recurseDueTime);
  150. });
  151. }
  152. }
  153. });
  154. return new CompositeDisposable(subscription, cancelable);
  155. });
  156. #endif
  157. }
  158. #endregion
  159. #region DateTimeOffset
  160. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)
  161. {
  162. return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  163. }
  164. public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  165. {
  166. return Delay_<TSource>(source, dueTime, scheduler);
  167. }
  168. private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  169. {
  170. #if !NO_PERF
  171. return new Delay<TSource>(source, dueTime, scheduler);
  172. #else
  173. return Observable.Defer(() =>
  174. {
  175. var timeSpan = dueTime.Subtract(scheduler.Now);
  176. return Delay_<TSource>(source, timeSpan, scheduler);
  177. });
  178. #endif
  179. }
  180. #endregion
  181. #region Duration selector
  182. public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delayDurationSelector)
  183. {
  184. return Delay_<TSource, TDelay>(source, null, delayDurationSelector);
  185. }
  186. public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)
  187. {
  188. return Delay_<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);
  189. }
  190. private static IObservable<TSource> Delay_<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)
  191. {
  192. #if !NO_PERF
  193. return new Delay<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);
  194. #else
  195. return new AnonymousObservable<TSource>(observer =>
  196. {
  197. var delays = new CompositeDisposable();
  198. var gate = new object();
  199. var atEnd = false;
  200. var done = new Action(() =>
  201. {
  202. if (atEnd && delays.Count == 0)
  203. {
  204. observer.OnCompleted();
  205. }
  206. });
  207. var subscription = new SerialDisposable();
  208. var start = new Action(() =>
  209. {
  210. subscription.Disposable = source.Subscribe(
  211. x =>
  212. {
  213. var delay = default(IObservable<TDelay>);
  214. try
  215. {
  216. delay = delayDurationSelector(x);
  217. }
  218. catch (Exception error)
  219. {
  220. lock (gate)
  221. observer.OnError(error);
  222. return;
  223. }
  224. var d = new SingleAssignmentDisposable();
  225. delays.Add(d);
  226. d.Disposable = delay.Subscribe(
  227. _ =>
  228. {
  229. lock (gate)
  230. {
  231. observer.OnNext(x);
  232. delays.Remove(d);
  233. done();
  234. }
  235. },
  236. exception =>
  237. {
  238. lock (gate)
  239. observer.OnError(exception);
  240. },
  241. () =>
  242. {
  243. lock (gate)
  244. {
  245. observer.OnNext(x);
  246. delays.Remove(d);
  247. done();
  248. }
  249. }
  250. );
  251. },
  252. exception =>
  253. {
  254. lock (gate)
  255. {
  256. observer.OnError(exception);
  257. }
  258. },
  259. () =>
  260. {
  261. lock (gate)
  262. {
  263. atEnd = true;
  264. subscription.Dispose();
  265. done();
  266. }
  267. }
  268. );
  269. });
  270. if (subscriptionDelay == null)
  271. {
  272. start();
  273. }
  274. else
  275. {
  276. subscription.Disposable = subscriptionDelay.Subscribe(
  277. _ =>
  278. {
  279. start();
  280. },
  281. observer.OnError,
  282. start
  283. );
  284. }
  285. return new CompositeDisposable(subscription, delays);
  286. });
  287. #endif
  288. }
  289. #endregion
  290. #endregion
  291. #region + DelaySubscription +
  292. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  293. {
  294. return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  295. }
  296. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  297. {
  298. return DelaySubscription_<TSource>(source, dueTime, scheduler);
  299. }
  300. private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  301. {
  302. #if !NO_PERF
  303. return new DelaySubscription<TSource>(source, dueTime, scheduler);
  304. #else
  305. return new AnonymousObservable<TSource>(observer =>
  306. {
  307. var d = new MultipleAssignmentDisposable();
  308. var dt = Normalize(dueTime);
  309. d.Disposable = scheduler.Schedule(dt, () =>
  310. {
  311. d.Disposable = source.Subscribe(observer);
  312. });
  313. return d;
  314. });
  315. #endif
  316. }
  317. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)
  318. {
  319. return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  320. }
  321. public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  322. {
  323. return DelaySubscription_<TSource>(source, dueTime, scheduler);
  324. }
  325. private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  326. {
  327. #if !NO_PERF
  328. return new DelaySubscription<TSource>(source, dueTime, scheduler);
  329. #else
  330. return new AnonymousObservable<TSource>(observer =>
  331. {
  332. var d = new MultipleAssignmentDisposable();
  333. d.Disposable = scheduler.Schedule(dueTime, () =>
  334. {
  335. d.Disposable = source.Subscribe(observer);
  336. });
  337. return d;
  338. });
  339. #endif
  340. }
  341. #endregion
  342. #region + Generate +
  343. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector)
  344. {
  345. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations);
  346. }
  347. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
  348. {
  349. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  350. }
  351. private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
  352. {
  353. #if !NO_PERF
  354. return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  355. #else
  356. return new AnonymousObservable<TResult>(observer =>
  357. {
  358. var state = initialState;
  359. var first = true;
  360. var hasResult = false;
  361. var result = default(TResult);
  362. var time = default(TimeSpan);
  363. return scheduler.Schedule(TimeSpan.Zero, self =>
  364. {
  365. if (hasResult)
  366. observer.OnNext(result);
  367. try
  368. {
  369. if (first)
  370. first = false;
  371. else
  372. state = iterate(state);
  373. hasResult = condition(state);
  374. if (hasResult)
  375. {
  376. result = resultSelector(state);
  377. time = timeSelector(state);
  378. }
  379. }
  380. catch (Exception exception)
  381. {
  382. observer.OnError(exception);
  383. return;
  384. }
  385. if (hasResult)
  386. self(time);
  387. else
  388. observer.OnCompleted();
  389. });
  390. });
  391. #endif
  392. }
  393. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector)
  394. {
  395. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations);
  396. }
  397. public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
  398. {
  399. return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  400. }
  401. private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
  402. {
  403. #if !NO_PERF
  404. return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
  405. #else
  406. return new AnonymousObservable<TResult>(observer =>
  407. {
  408. var state = initialState;
  409. var first = true;
  410. var hasResult = false;
  411. var result = default(TResult);
  412. var time = default(DateTimeOffset);
  413. return scheduler.Schedule(scheduler.Now, self =>
  414. {
  415. if (hasResult)
  416. observer.OnNext(result);
  417. try
  418. {
  419. if (first)
  420. first = false;
  421. else
  422. state = iterate(state);
  423. hasResult = condition(state);
  424. if (hasResult)
  425. {
  426. result = resultSelector(state);
  427. time = timeSelector(state);
  428. }
  429. }
  430. catch (Exception exception)
  431. {
  432. observer.OnError(exception);
  433. return;
  434. }
  435. if (hasResult)
  436. self(time);
  437. else
  438. observer.OnCompleted();
  439. });
  440. });
  441. #endif
  442. }
  443. #endregion
  444. #region + Interval +
  445. public virtual IObservable<long> Interval(TimeSpan period)
  446. {
  447. return Timer_(period, period, SchedulerDefaults.TimeBasedOperations);
  448. }
  449. public virtual IObservable<long> Interval(TimeSpan period, IScheduler scheduler)
  450. {
  451. return Timer_(period, period, scheduler);
  452. }
  453. #endregion
  454. #region + Sample +
  455. public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval)
  456. {
  457. return Sample_<TSource>(source, interval, SchedulerDefaults.TimeBasedOperations);
  458. }
  459. public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)
  460. {
  461. return Sample_<TSource>(source, interval, scheduler);
  462. }
  463. private static IObservable<TSource> Sample_<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)
  464. {
  465. #if !NO_PERF
  466. return new Sample<TSource>(source, interval, scheduler);
  467. #else
  468. var sampler = Observable.Interval(interval, scheduler);
  469. return Sample_<TSource, long>(source, sampler);
  470. #endif
  471. }
  472. public virtual IObservable<TSource> Sample<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler)
  473. {
  474. return Sample_<TSource, TSample>(source, sampler);
  475. }
  476. private static IObservable<TSource> Sample_<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler)
  477. {
  478. #if !NO_PERF
  479. return new Sample<TSource, TSample>(source, sampler);
  480. #else
  481. return Combine(source, sampler, (IObserver<TSource> observer, IDisposable leftSubscription, IDisposable rightSubscription) =>
  482. {
  483. var value = default(Notification<TSource>);
  484. var atEnd = false;
  485. return new BinaryObserver<TSource, TSample>(
  486. newValue =>
  487. {
  488. switch (newValue.Kind)
  489. {
  490. case NotificationKind.OnNext:
  491. value = newValue;
  492. break;
  493. case NotificationKind.OnError:
  494. newValue.Accept(observer);
  495. break;
  496. case NotificationKind.OnCompleted:
  497. atEnd = true;
  498. break;
  499. }
  500. },
  501. _ =>
  502. {
  503. var myValue = value;
  504. value = null;
  505. if (myValue != null)
  506. myValue.Accept(observer);
  507. if (atEnd)
  508. observer.OnCompleted();
  509. });
  510. });
  511. #endif
  512. }
  513. #endregion
  514. #region + Skip +
  515. public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration)
  516. {
  517. return Skip_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  518. }
  519. public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  520. {
  521. return Skip_<TSource>(source, duration, scheduler);
  522. }
  523. private static IObservable<TSource> Skip_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  524. {
  525. #if !NO_PERF
  526. var skip = source as Skip<TSource>;
  527. if (skip != null && skip._scheduler == scheduler)
  528. return skip.Omega(duration);
  529. return new Skip<TSource>(source, duration, scheduler);
  530. #else
  531. return new AnonymousObservable<TSource>(observer =>
  532. {
  533. var open = false;
  534. var t = scheduler.Schedule(duration, () => open = true);
  535. var d = source.Subscribe(
  536. x =>
  537. {
  538. if (open)
  539. observer.OnNext(x);
  540. },
  541. observer.OnError,
  542. observer.OnCompleted
  543. );
  544. return new CompositeDisposable(t, d);
  545. });
  546. #endif
  547. }
  548. #endregion
  549. #region + SkipLast +
  550. public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration)
  551. {
  552. return SkipLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  553. }
  554. public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  555. {
  556. return SkipLast_<TSource>(source, duration, scheduler);
  557. }
  558. private static IObservable<TSource> SkipLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  559. {
  560. #if !NO_PERF
  561. return new SkipLast<TSource>(source, duration, scheduler);
  562. #else
  563. return new AnonymousObservable<TSource>(observer =>
  564. {
  565. var q = new Queue<System.Reactive.TimeInterval<TSource>>();
  566. var swp = scheduler.AsStopwatchProvider();
  567. var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch();
  568. return source.Subscribe(
  569. x =>
  570. {
  571. var now = sw.Elapsed;
  572. q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now));
  573. while (q.Count > 0 && now - q.Peek().Interval >= duration)
  574. observer.OnNext(q.Dequeue().Value);
  575. },
  576. observer.OnError,
  577. () =>
  578. {
  579. var now = sw.Elapsed;
  580. while (q.Count > 0 && now - q.Peek().Interval >= duration)
  581. observer.OnNext(q.Dequeue().Value);
  582. observer.OnCompleted();
  583. }
  584. );
  585. });
  586. #endif
  587. }
  588. #endregion
  589. #region + SkipUntil +
  590. public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime)
  591. {
  592. return SkipUntil_<TSource>(source, startTime, SchedulerDefaults.TimeBasedOperations);
  593. }
  594. public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler)
  595. {
  596. return SkipUntil_<TSource>(source, startTime, scheduler);
  597. }
  598. private static IObservable<TSource> SkipUntil_<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler)
  599. {
  600. #if !NO_PERF
  601. var skipUntil = source as SkipUntil<TSource>;
  602. if (skipUntil != null && skipUntil._scheduler == scheduler)
  603. return skipUntil.Omega(startTime);
  604. return new SkipUntil<TSource>(source, startTime, scheduler);
  605. #else
  606. return new AnonymousObservable<TSource>(observer =>
  607. {
  608. var open = false;
  609. var t = scheduler.Schedule(startTime, () => open = true);
  610. var d = source.Subscribe(
  611. x =>
  612. {
  613. if (open)
  614. observer.OnNext(x);
  615. },
  616. observer.OnError,
  617. observer.OnCompleted
  618. );
  619. return new CompositeDisposable(t, d);
  620. });
  621. #endif
  622. }
  623. #endregion
  624. #region + Take +
  625. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration)
  626. {
  627. return Take_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  628. }
  629. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  630. {
  631. return Take_<TSource>(source, duration, scheduler);
  632. }
  633. private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  634. {
  635. #if !NO_PERF
  636. var take = source as Take<TSource>;
  637. if (take != null && take._scheduler == scheduler)
  638. return take.Omega(duration);
  639. return new Take<TSource>(source, duration, scheduler);
  640. #else
  641. return new AnonymousObservable<TSource>(observer =>
  642. {
  643. var gate = new object();
  644. var t = scheduler.Schedule(duration, () =>
  645. {
  646. lock (gate)
  647. {
  648. observer.OnCompleted();
  649. }
  650. });
  651. var d = source.Synchronize(gate).Subscribe(observer);
  652. return new CompositeDisposable(t, d);
  653. });
  654. #endif
  655. }
  656. #endregion
  657. #region + TakeLast +
  658. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration)
  659. {
  660. return TakeLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations, SchedulerDefaults.Iteration);
  661. }
  662. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  663. {
  664. return TakeLast_<TSource>(source, duration, scheduler, SchedulerDefaults.Iteration);
  665. }
  666. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler)
  667. {
  668. return TakeLast_<TSource>(source, duration, timerScheduler, loopScheduler);
  669. }
  670. private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler)
  671. {
  672. #if !NO_PERF
  673. return new TakeLast<TSource>(source, duration, timerScheduler, loopScheduler);
  674. #else
  675. return new AnonymousObservable<TSource>(observer =>
  676. {
  677. var q = new Queue<System.Reactive.TimeInterval<TSource>>();
  678. var swp = timerScheduler.AsStopwatchProvider();
  679. var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch();
  680. var trim = new Action<TimeSpan>(now =>
  681. {
  682. while (q.Count > 0 && now - q.Peek().Interval >= duration)
  683. q.Dequeue();
  684. });
  685. var g = new CompositeDisposable();
  686. g.Add(source.Subscribe(
  687. x =>
  688. {
  689. var now = sw.Elapsed;
  690. q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now));
  691. trim(now);
  692. },
  693. observer.OnError,
  694. () =>
  695. {
  696. var now = sw.Elapsed;
  697. trim(now);
  698. g.Add(loopScheduler.Schedule(rec =>
  699. {
  700. if (q.Count > 0)
  701. {
  702. observer.OnNext(q.Dequeue().Value);
  703. rec();
  704. }
  705. else
  706. {
  707. observer.OnCompleted();
  708. }
  709. }));
  710. }
  711. ));
  712. return g;
  713. });
  714. #endif
  715. }
  716. public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration)
  717. {
  718. return TakeLastBuffer_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);
  719. }
  720. public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  721. {
  722. return TakeLastBuffer_<TSource>(source, duration, scheduler);
  723. }
  724. private static IObservable<IList<TSource>> TakeLastBuffer_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  725. {
  726. #if !NO_PERF
  727. return new TakeLastBuffer<TSource>(source, duration, scheduler);
  728. #else
  729. return new AnonymousObservable<IList<TSource>>(observer =>
  730. {
  731. var q = new Queue<System.Reactive.TimeInterval<TSource>>();
  732. var swp = scheduler.AsStopwatchProvider();
  733. var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch();
  734. return source.Subscribe(
  735. x =>
  736. {
  737. var now = sw.Elapsed;
  738. q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now));
  739. while (q.Count > 0 && now - q.Peek().Interval >= duration)
  740. q.Dequeue();
  741. },
  742. observer.OnError,
  743. () =>
  744. {
  745. var now = sw.Elapsed;
  746. var res = new List<TSource>();
  747. while (q.Count > 0)
  748. {
  749. var next = q.Dequeue();
  750. if (now - next.Interval <= duration)
  751. res.Add(next.Value);
  752. }
  753. observer.OnNext(res);
  754. observer.OnCompleted();
  755. }
  756. );
  757. });
  758. #endif
  759. }
  760. #endregion
  761. #region + TakeUntil +
  762. public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime)
  763. {
  764. return TakeUntil_<TSource>(source, endTime, SchedulerDefaults.TimeBasedOperations);
  765. }
  766. public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler)
  767. {
  768. return TakeUntil_<TSource>(source, endTime, scheduler);
  769. }
  770. private static IObservable<TSource> TakeUntil_<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler)
  771. {
  772. #if !NO_PERF
  773. var takeUntil = source as TakeUntil<TSource>;
  774. if (takeUntil != null && takeUntil._scheduler == scheduler)
  775. return takeUntil.Omega(endTime);
  776. return new TakeUntil<TSource>(source, endTime, scheduler);
  777. #else
  778. return new AnonymousObservable<TSource>(observer =>
  779. {
  780. var gate = new object();
  781. var t = scheduler.Schedule(endTime, () =>
  782. {
  783. lock (gate)
  784. {
  785. observer.OnCompleted();
  786. }
  787. });
  788. var d = source.Synchronize(gate).Subscribe(observer);
  789. return new CompositeDisposable(t, d);
  790. });
  791. #endif
  792. }
  793. #endregion
  794. #region + Throttle +
  795. public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  796. {
  797. return Throttle_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);
  798. }
  799. public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  800. {
  801. return Throttle_<TSource>(source, dueTime, scheduler);
  802. }
  803. private static IObservable<TSource> Throttle_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  804. {
  805. #if !NO_PERF
  806. return new Throttle<TSource>(source, dueTime, scheduler);
  807. #else
  808. return new AnonymousObservable<TSource>(observer =>
  809. {
  810. var gate = new object();
  811. var value = default(TSource);
  812. var hasValue = false;
  813. var cancelable = new SerialDisposable();
  814. var id = 0UL;
  815. var subscription = source.Subscribe(x =>
  816. {
  817. ulong currentid;
  818. lock (gate)
  819. {
  820. hasValue = true;
  821. value = x;
  822. id = unchecked(id + 1);
  823. currentid = id;
  824. }
  825. var d = new SingleAssignmentDisposable();
  826. cancelable.Disposable = d;
  827. d.Disposable = scheduler.Schedule(dueTime, () =>
  828. {
  829. lock (gate)
  830. {
  831. if (hasValue && id == currentid)
  832. observer.OnNext(value);
  833. hasValue = false;
  834. }
  835. });
  836. },
  837. exception =>
  838. {
  839. cancelable.Dispose();
  840. lock (gate)
  841. {
  842. observer.OnError(exception);
  843. hasValue = false;
  844. id = unchecked(id + 1);
  845. }
  846. },
  847. () =>
  848. {
  849. cancelable.Dispose();
  850. lock (gate)
  851. {
  852. if (hasValue)
  853. observer.OnNext(value);
  854. observer.OnCompleted();
  855. hasValue = false;
  856. id = unchecked(id + 1);
  857. }
  858. });
  859. return new CompositeDisposable(subscription, cancelable);
  860. });
  861. #endif
  862. }
  863. public virtual IObservable<TSource> Throttle<TSource, TThrottle>(IObservable<TSource> source, Func<TSource, IObservable<TThrottle>> throttleDurationSelector)
  864. {
  865. #if !NO_PERF
  866. return new Throttle<TSource, TThrottle>(source, throttleDurationSelector);
  867. #else
  868. return new AnonymousObservable<TSource>(observer =>
  869. {
  870. var gate = new object();
  871. var value = default(TSource);
  872. var hasValue = false;
  873. var cancelable = new SerialDisposable();
  874. var id = 0UL;
  875. var subscription = source.Subscribe(
  876. x =>
  877. {
  878. var throttle = default(IObservable<TThrottle>);
  879. try
  880. {
  881. throttle = throttleDurationSelector(x);
  882. }
  883. catch (Exception error)
  884. {
  885. lock (gate)
  886. observer.OnError(error);
  887. return;
  888. }
  889. ulong currentid;
  890. lock (gate)
  891. {
  892. hasValue = true;
  893. value = x;
  894. id = unchecked(id + 1);
  895. currentid = id;
  896. }
  897. var d = new SingleAssignmentDisposable();
  898. cancelable.Disposable = d;
  899. d.Disposable = throttle.Subscribe(
  900. _ =>
  901. {
  902. lock (gate)
  903. {
  904. if (hasValue && id == currentid)
  905. observer.OnNext(value);
  906. hasValue = false;
  907. d.Dispose();
  908. }
  909. },
  910. exception =>
  911. {
  912. lock (gate)
  913. {
  914. observer.OnError(exception);
  915. }
  916. },
  917. () =>
  918. {
  919. lock (gate)
  920. {
  921. if (hasValue && id == currentid)
  922. observer.OnNext(value);
  923. hasValue = false;
  924. d.Dispose();
  925. }
  926. }
  927. );
  928. },
  929. exception =>
  930. {
  931. cancelable.Dispose();
  932. lock (gate)
  933. {
  934. observer.OnError(exception);
  935. hasValue = false;
  936. id = unchecked(id + 1);
  937. }
  938. },
  939. () =>
  940. {
  941. cancelable.Dispose();
  942. lock (gate)
  943. {
  944. if (hasValue)
  945. observer.OnNext(value);
  946. observer.OnCompleted();
  947. hasValue = false;
  948. id = unchecked(id + 1);
  949. }
  950. });
  951. return new CompositeDisposable(subscription, cancelable);
  952. });
  953. #endif
  954. }
  955. #endregion
  956. #region + TimeInterval +
  957. public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source)
  958. {
  959. return TimeInterval_<TSource>(source, SchedulerDefaults.TimeBasedOperations);
  960. }
  961. public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source, IScheduler scheduler)
  962. {
  963. return TimeInterval_<TSource>(source, scheduler);
  964. }
  965. #if !NO_PERF
  966. private static IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval_<TSource>(IObservable<TSource> source, IScheduler scheduler)
  967. {
  968. return new TimeInterval<TSource>(source, scheduler);
  969. }
  970. #else
  971. private IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval_<TSource>(IObservable<TSource> source, IScheduler scheduler)
  972. {
  973. return Defer(() =>
  974. {
  975. var last = scheduler.Now;
  976. return source.Select(x =>
  977. {
  978. var now = scheduler.Now;
  979. var span = now.Subtract(last);
  980. last = now;
  981. return new System.Reactive.TimeInterval<TSource>(x, span);
  982. });
  983. });
  984. }
  985. #endif
  986. #endregion
  987. #region + Timeout +
  988. #region TimeSpan
  989. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime)
  990. {
  991. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations);
  992. }
  993. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  994. {
  995. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler);
  996. }
  997. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other)
  998. {
  999. return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations);
  1000. }
  1001. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)
  1002. {
  1003. return Timeout_<TSource>(source, dueTime, other, scheduler);
  1004. }
  1005. private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)
  1006. {
  1007. #if !NO_PERF
  1008. return new Timeout<TSource>(source, dueTime, other, scheduler);
  1009. #else
  1010. return new AnonymousObservable<TSource>(observer =>
  1011. {
  1012. var subscription = new SerialDisposable();
  1013. var timer = new SerialDisposable();
  1014. var original = new SingleAssignmentDisposable();
  1015. subscription.Disposable = original;
  1016. var gate = new object();
  1017. var id = 0UL;
  1018. var switched = false;
  1019. Action createTimer = () =>
  1020. {
  1021. var myid = id;
  1022. timer.Disposable = scheduler.Schedule(dueTime, () =>
  1023. {
  1024. var timerWins = false;
  1025. lock (gate)
  1026. {
  1027. switched = (id == myid);
  1028. timerWins = switched;
  1029. }
  1030. if (timerWins)
  1031. subscription.Disposable = other.Subscribe(observer);
  1032. });
  1033. };
  1034. createTimer();
  1035. original.Disposable = source.Subscribe(
  1036. x =>
  1037. {
  1038. var onNextWins = false;
  1039. lock (gate)
  1040. {
  1041. onNextWins = !switched;
  1042. if (onNextWins)
  1043. {
  1044. id = unchecked(id + 1);
  1045. }
  1046. }
  1047. if (onNextWins)
  1048. {
  1049. observer.OnNext(x);
  1050. createTimer();
  1051. }
  1052. },
  1053. exception =>
  1054. {
  1055. var onErrorWins = false;
  1056. lock (gate)
  1057. {
  1058. onErrorWins = !switched;
  1059. if (onErrorWins)
  1060. {
  1061. id = unchecked(id + 1);
  1062. }
  1063. }
  1064. if (onErrorWins)
  1065. observer.OnError(exception);
  1066. },
  1067. () =>
  1068. {
  1069. var onCompletedWins = false;
  1070. lock (gate)
  1071. {
  1072. onCompletedWins = !switched;
  1073. if (onCompletedWins)
  1074. {
  1075. id = unchecked(id + 1);
  1076. }
  1077. }
  1078. if (onCompletedWins)
  1079. observer.OnCompleted();
  1080. });
  1081. return new CompositeDisposable(subscription, timer);
  1082. });
  1083. #endif
  1084. }
  1085. #endregion
  1086. #region DateTimeOffset
  1087. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)
  1088. {
  1089. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations);
  1090. }
  1091. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  1092. {
  1093. return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler);
  1094. }
  1095. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other)
  1096. {
  1097. return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations);
  1098. }
  1099. public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)
  1100. {
  1101. return Timeout_<TSource>(source, dueTime, other, scheduler);
  1102. }
  1103. private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)
  1104. {
  1105. #if !NO_PERF
  1106. return new Timeout<TSource>(source, dueTime, other, scheduler);
  1107. #else
  1108. return new AnonymousObservable<TSource>(observer =>
  1109. {
  1110. var subscription = new SerialDisposable();
  1111. var original = new SingleAssignmentDisposable();
  1112. subscription.Disposable = original;
  1113. var gate = new object();
  1114. var switched = false;
  1115. var timer = scheduler.Schedule(dueTime, () =>
  1116. {
  1117. var timerWins = false;
  1118. lock (gate)
  1119. {
  1120. timerWins = !switched;
  1121. switched = true;
  1122. }
  1123. if (timerWins)
  1124. subscription.Disposable = other.Subscribe(observer);
  1125. });
  1126. original.Disposable = source.Subscribe(
  1127. x =>
  1128. {
  1129. lock (gate)
  1130. {
  1131. if (!switched)
  1132. observer.OnNext(x);
  1133. }
  1134. },
  1135. exception =>
  1136. {
  1137. var onErrorWins = false;
  1138. lock (gate)
  1139. {
  1140. onErrorWins = !switched;
  1141. switched = true;
  1142. }
  1143. if (onErrorWins)
  1144. observer.OnError(exception);
  1145. },
  1146. () =>
  1147. {
  1148. var onCompletedWins = false;
  1149. lock (gate)
  1150. {
  1151. onCompletedWins = !switched;
  1152. switched = true;
  1153. }
  1154. if (onCompletedWins)
  1155. observer.OnCompleted();
  1156. });
  1157. return new CompositeDisposable(subscription, timer);
  1158. });
  1159. #endif
  1160. }
  1161. #endregion
  1162. #region Duration selector
  1163. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector)
  1164. {
  1165. return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException()));
  1166. }
  1167. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)
  1168. {
  1169. return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, other);
  1170. }
  1171. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector)
  1172. {
  1173. return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException()));
  1174. }
  1175. public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)
  1176. {
  1177. return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other);
  1178. }
  1179. private static IObservable<TSource> Timeout_<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)
  1180. {
  1181. #if !NO_PERF
  1182. return new Timeout<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other);
  1183. #else
  1184. return new AnonymousObservable<TSource>(observer =>
  1185. {
  1186. var subscription = new SerialDisposable();
  1187. var timer = new SerialDisposable();
  1188. var original = new SingleAssignmentDisposable();
  1189. subscription.Disposable = original;
  1190. var gate = new object();
  1191. var id = 0UL;
  1192. var switched = false;
  1193. Action<IObservable<TTimeout>> setTimer = timeout =>
  1194. {
  1195. var myid = id;
  1196. Func<bool> timerWins = () =>
  1197. {
  1198. var res = false;
  1199. lock (gate)
  1200. {
  1201. switched = (id == myid);
  1202. res = switched;
  1203. }
  1204. return res;
  1205. };
  1206. var d = new SingleAssignmentDisposable();
  1207. timer.Disposable = d;
  1208. d.Disposable = timeout.Subscribe(
  1209. _ =>
  1210. {
  1211. if (timerWins())
  1212. subscription.Disposable = other.Subscribe(observer);
  1213. d.Dispose();
  1214. },
  1215. error =>
  1216. {
  1217. if (timerWins())
  1218. observer.OnError(error);
  1219. },
  1220. () =>
  1221. {
  1222. if (timerWins())
  1223. subscription.Disposable = other.Subscribe(observer);
  1224. }
  1225. );
  1226. };
  1227. setTimer(firstTimeout);
  1228. Func<bool> observerWins = () =>
  1229. {
  1230. var res = false;
  1231. lock (gate)
  1232. {
  1233. res = !switched;
  1234. if (res)
  1235. {
  1236. id = unchecked(id + 1);
  1237. }
  1238. }
  1239. return res;
  1240. };
  1241. original.Disposable = source.Subscribe(
  1242. x =>
  1243. {
  1244. if (observerWins())
  1245. {
  1246. observer.OnNext(x);
  1247. var timeout = default(IObservable<TTimeout>);
  1248. try
  1249. {
  1250. timeout = timeoutDurationSelector(x);
  1251. }
  1252. catch (Exception error)
  1253. {
  1254. observer.OnError(error);
  1255. return;
  1256. }
  1257. setTimer(timeout);
  1258. }
  1259. },
  1260. exception =>
  1261. {
  1262. if (observerWins())
  1263. observer.OnError(exception);
  1264. },
  1265. () =>
  1266. {
  1267. if (observerWins())
  1268. observer.OnCompleted();
  1269. }
  1270. );
  1271. return new CompositeDisposable(subscription, timer);
  1272. });
  1273. #endif
  1274. }
  1275. #endregion
  1276. #endregion
  1277. #region + Timer +
  1278. public virtual IObservable<long> Timer(TimeSpan dueTime)
  1279. {
  1280. return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
  1281. }
  1282. public virtual IObservable<long> Timer(DateTimeOffset dueTime)
  1283. {
  1284. return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
  1285. }
  1286. public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period)
  1287. {
  1288. return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations);
  1289. }
  1290. public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period)
  1291. {
  1292. return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations);
  1293. }
  1294. public virtual IObservable<long> Timer(TimeSpan dueTime, IScheduler scheduler)
  1295. {
  1296. return Timer_(dueTime, scheduler);
  1297. }
  1298. public virtual IObservable<long> Timer(DateTimeOffset dueTime, IScheduler scheduler)
  1299. {
  1300. return Timer_(dueTime, scheduler);
  1301. }
  1302. public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
  1303. {
  1304. return Timer_(dueTime, period, scheduler);
  1305. }
  1306. public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)
  1307. {
  1308. return Timer_(dueTime, period, scheduler);
  1309. }
  1310. private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler)
  1311. {
  1312. #if !NO_PERF
  1313. return new Timer(dueTime, null, scheduler);
  1314. #else
  1315. var d = Normalize(dueTime);
  1316. return new AnonymousObservable<long>(observer =>
  1317. scheduler.Schedule(d, () =>
  1318. {
  1319. observer.OnNext(0);
  1320. observer.OnCompleted();
  1321. }));
  1322. #endif
  1323. }
  1324. #if !NO_PERF
  1325. private static IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
  1326. {
  1327. return new Timer(dueTime, period, scheduler);
  1328. }
  1329. #else
  1330. private IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
  1331. {
  1332. var p = Normalize(period);
  1333. return Defer(() => Timer(scheduler.Now + dueTime, p, scheduler));
  1334. }
  1335. #endif
  1336. private static IObservable<long> Timer_(DateTimeOffset dueTime, IScheduler scheduler)
  1337. {
  1338. #if !NO_PERF
  1339. return new Timer(dueTime, null, scheduler);
  1340. #else
  1341. return new AnonymousObservable<long>(observer =>
  1342. scheduler.Schedule(dueTime, () =>
  1343. {
  1344. observer.OnNext(0);
  1345. observer.OnCompleted();
  1346. }));
  1347. #endif
  1348. }
  1349. private static IObservable<long> Timer_(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)
  1350. {
  1351. #if !NO_PERF
  1352. return new Timer(dueTime, period, scheduler);
  1353. #else
  1354. var p = Normalize(period);
  1355. return new AnonymousObservable<long>(observer =>
  1356. {
  1357. var d = dueTime;
  1358. var count = 0L;
  1359. return scheduler.Schedule(d, self =>
  1360. {
  1361. if (p > TimeSpan.Zero)
  1362. {
  1363. var now = scheduler.Now;
  1364. d = d + p;
  1365. if (d <= now)
  1366. d = now + p;
  1367. }
  1368. observer.OnNext(count);
  1369. count = unchecked(count + 1);
  1370. self(d);
  1371. });
  1372. });
  1373. #endif
  1374. }
  1375. #endregion
  1376. #region + Timestamp +
  1377. public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source)
  1378. {
  1379. return Timestamp_<TSource>(source, SchedulerDefaults.TimeBasedOperations);
  1380. }
  1381. public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source, IScheduler scheduler)
  1382. {
  1383. return Timestamp_<TSource>(source, scheduler);
  1384. }
  1385. private static IObservable<Timestamped<TSource>> Timestamp_<TSource>(IObservable<TSource> source, IScheduler scheduler)
  1386. {
  1387. #if !NO_PERF
  1388. return new Timestamp<TSource>(source, scheduler);
  1389. #else
  1390. return source.Select(x => new Timestamped<TSource>(x, scheduler.Now));
  1391. #endif
  1392. }
  1393. #endregion
  1394. #region + Window +
  1395. #region TimeSpan only
  1396. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan)
  1397. {
  1398. return Window_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations);
  1399. }
  1400. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
  1401. {
  1402. return Window_<TSource>(source, timeSpan, timeSpan, scheduler);
  1403. }
  1404. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  1405. {
  1406. return Window_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations);
  1407. }
  1408. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  1409. {
  1410. return Window_<TSource>(source, timeSpan, timeShift, scheduler);
  1411. }
  1412. private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  1413. {
  1414. #if !NO_PERF
  1415. return new Window<TSource>(source, timeSpan, timeShift, scheduler);
  1416. #else
  1417. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1418. {
  1419. var totalTime = TimeSpan.Zero;
  1420. var nextShift = timeShift;
  1421. var nextSpan = timeSpan;
  1422. var gate = new object();
  1423. var q = new Queue<ISubject<TSource>>();
  1424. var timerD = new SerialDisposable();
  1425. var groupDisposable = new CompositeDisposable(2) { timerD };
  1426. var refCountDisposable = new RefCountDisposable(groupDisposable);
  1427. var createTimer = default(Action);
  1428. createTimer = () =>
  1429. {
  1430. var m = new SingleAssignmentDisposable();
  1431. timerD.Disposable = m;
  1432. var isSpan = false;
  1433. var isShift = false;
  1434. if (nextSpan == nextShift)
  1435. {
  1436. isSpan = true;
  1437. isShift = true;
  1438. }
  1439. else if (nextSpan < nextShift)
  1440. isSpan = true;
  1441. else
  1442. isShift = true;
  1443. var newTotalTime = isSpan ? nextSpan : nextShift;
  1444. var ts = newTotalTime - totalTime;
  1445. totalTime = newTotalTime;
  1446. if (isSpan)
  1447. nextSpan += timeShift;
  1448. if (isShift)
  1449. nextShift += timeShift;
  1450. m.Disposable = scheduler.Schedule(ts, () =>
  1451. {
  1452. lock (gate)
  1453. {
  1454. if (isShift)
  1455. {
  1456. var s = new Subject<TSource>();
  1457. q.Enqueue(s);
  1458. observer.OnNext(s.AddRef(refCountDisposable));
  1459. }
  1460. if (isSpan)
  1461. {
  1462. var s = q.Dequeue();
  1463. s.OnCompleted();
  1464. }
  1465. }
  1466. createTimer();
  1467. });
  1468. };
  1469. q.Enqueue(new Subject<TSource>());
  1470. observer.OnNext(q.Peek().AddRef(refCountDisposable));
  1471. createTimer();
  1472. groupDisposable.Add(source.Subscribe(
  1473. x =>
  1474. {
  1475. lock (gate)
  1476. {
  1477. foreach (var s in q)
  1478. s.OnNext(x);
  1479. }
  1480. },
  1481. exception =>
  1482. {
  1483. lock (gate)
  1484. {
  1485. foreach (var s in q)
  1486. s.OnError(exception);
  1487. observer.OnError(exception);
  1488. }
  1489. },
  1490. () =>
  1491. {
  1492. lock (gate)
  1493. {
  1494. foreach (var s in q)
  1495. s.OnCompleted();
  1496. observer.OnCompleted();
  1497. }
  1498. }
  1499. ));
  1500. return refCountDisposable;
  1501. });
  1502. #endif
  1503. }
  1504. #endregion
  1505. #region TimeSpan + int
  1506. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count)
  1507. {
  1508. return Window_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations);
  1509. }
  1510. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  1511. {
  1512. return Window_<TSource>(source, timeSpan, count, scheduler);
  1513. }
  1514. private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  1515. {
  1516. #if !NO_PERF
  1517. return new Window<TSource>(source, timeSpan, count, scheduler);
  1518. #else
  1519. return new AnonymousObservable<IObservable<TSource>>(observer =>
  1520. {
  1521. var gate = new object();
  1522. var s = default(ISubject<TSource>);
  1523. var n = 0;
  1524. var windowId = 0;
  1525. var timerD = new SerialDisposable();
  1526. var groupDisposable = new CompositeDisposable(2) { timerD };
  1527. var refCountDisposable = new RefCountDisposable(groupDisposable);
  1528. var createTimer = default(Action<int>);
  1529. createTimer = id =>
  1530. {
  1531. var m = new SingleAssignmentDisposable();
  1532. timerD.Disposable = m;
  1533. m.Disposable = scheduler.Schedule(timeSpan, () =>
  1534. {
  1535. var newId = 0;
  1536. lock (gate)
  1537. {
  1538. if (id != windowId)
  1539. return;
  1540. n = 0;
  1541. newId = ++windowId;
  1542. s.OnCompleted();
  1543. s = new Subject<TSource>();
  1544. observer.OnNext(s.AddRef(refCountDisposable));
  1545. }
  1546. createTimer(newId);
  1547. });
  1548. };
  1549. s = new Subject<TSource>();
  1550. observer.OnNext(s.AddRef(refCountDisposable));
  1551. createTimer(0);
  1552. groupDisposable.Add(source.Subscribe(
  1553. x =>
  1554. {
  1555. var newWindow = false;
  1556. var newId = 0;
  1557. lock (gate)
  1558. {
  1559. s.OnNext(x);
  1560. n++;
  1561. if (n == count)
  1562. {
  1563. newWindow = true;
  1564. n = 0;
  1565. newId = ++windowId;
  1566. s.OnCompleted();
  1567. s = new Subject<TSource>();
  1568. observer.OnNext(s.AddRef(refCountDisposable));
  1569. }
  1570. }
  1571. if (newWindow)
  1572. createTimer(newId);
  1573. },
  1574. exception =>
  1575. {
  1576. lock (gate)
  1577. {
  1578. s.OnError(exception);
  1579. observer.OnError(exception);
  1580. }
  1581. },
  1582. () =>
  1583. {
  1584. lock (gate)
  1585. {
  1586. s.OnCompleted();
  1587. observer.OnCompleted();
  1588. }
  1589. }
  1590. ));
  1591. return refCountDisposable;
  1592. });
  1593. #endif
  1594. }
  1595. #endregion
  1596. #endregion
  1597. #region |> Helpers <|
  1598. #if NO_PERF
  1599. private static TimeSpan Normalize(TimeSpan timeSpan)
  1600. {
  1601. if (timeSpan.CompareTo(TimeSpan.Zero) < 0)
  1602. return TimeSpan.Zero;
  1603. return timeSpan;
  1604. }
  1605. #endif
  1606. #endregion
  1607. }
  1608. }