SkipLast.cs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. public partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<TSource> SkipLast<TSource>(this IAsyncObservable<TSource> source, int count)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. if (count < 0)
  16. throw new ArgumentOutOfRangeException(nameof(count));
  17. if (count == 0)
  18. {
  19. return source;
  20. }
  21. return Create(
  22. source,
  23. count,
  24. default(TSource),
  25. (source, count, observer) => source.SubscribeSafeAsync(AsyncObserver.SkipLast(observer, count)));
  26. }
  27. public static IAsyncObservable<TSource> SkipLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration)
  28. {
  29. if (source == null)
  30. throw new ArgumentNullException(nameof(source));
  31. if (duration < TimeSpan.Zero)
  32. throw new ArgumentOutOfRangeException(nameof(duration));
  33. if (duration == TimeSpan.Zero)
  34. {
  35. return source;
  36. }
  37. return Create(
  38. source,
  39. duration,
  40. default(TSource),
  41. (source, duration, observer) => source.SubscribeSafeAsync(AsyncObserver.SkipLast(observer, duration)));
  42. }
  43. public static IAsyncObservable<TSource> SkipLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IClock clock)
  44. {
  45. if (source == null)
  46. throw new ArgumentNullException(nameof(source));
  47. if (duration < TimeSpan.Zero)
  48. throw new ArgumentOutOfRangeException(nameof(duration));
  49. if (clock == null)
  50. throw new ArgumentNullException(nameof(clock));
  51. if (duration == TimeSpan.Zero)
  52. {
  53. return source;
  54. }
  55. return Create(
  56. source,
  57. (duration, clock),
  58. default(TSource),
  59. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.SkipLast(observer, state.duration, state.clock)));
  60. }
  61. }
  62. public partial class AsyncObserver
  63. {
  64. public static IAsyncObserver<TSource> SkipLast<TSource>(IAsyncObserver<TSource> observer, int count)
  65. {
  66. if (observer == null)
  67. throw new ArgumentNullException(nameof(observer));
  68. if (count <= 0)
  69. throw new ArgumentOutOfRangeException(nameof(count));
  70. var queue = new Queue<TSource>();
  71. return Create<TSource>(
  72. async x =>
  73. {
  74. queue.Enqueue(x);
  75. if (queue.Count > count)
  76. {
  77. await observer.OnNextAsync(queue.Dequeue()).ConfigureAwait(false);
  78. }
  79. },
  80. observer.OnErrorAsync,
  81. observer.OnCompletedAsync
  82. );
  83. }
  84. public static IAsyncObserver<TSource> SkipLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration) => SkipLast(observer, duration, Clock.Default);
  85. public static IAsyncObserver<TSource> SkipLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IClock clock)
  86. {
  87. if (observer == null)
  88. throw new ArgumentNullException(nameof(observer));
  89. if (duration < TimeSpan.Zero)
  90. throw new ArgumentOutOfRangeException(nameof(duration));
  91. if (clock == null)
  92. throw new ArgumentNullException(nameof(clock));
  93. var queue = new Queue<Timestamped<TSource>>();
  94. async Task FlushAsync(DateTimeOffset now)
  95. {
  96. while (queue.Count > 0 && now - queue.Peek().Timestamp >= duration)
  97. {
  98. await observer.OnNextAsync(queue.Dequeue().Value).ConfigureAwait(false);
  99. }
  100. }
  101. return Create<TSource>(
  102. async x =>
  103. {
  104. var now = clock.Now;
  105. queue.Enqueue(new Timestamped<TSource>(x, now));
  106. await FlushAsync(now).ConfigureAwait(false);
  107. },
  108. observer.OnErrorAsync,
  109. async () =>
  110. {
  111. await FlushAsync(clock.Now).ConfigureAwait(false);
  112. await observer.OnCompletedAsync().ConfigureAwait(false);
  113. }
  114. );
  115. }
  116. }
  117. }