Program.cs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Reactive.Subjects;
  10. using System.Threading.Tasks;
  11. namespace Playground
  12. {
  13. static class Program
  14. {
  15. static void Main()
  16. {
  17. MainAsync().GetAwaiter().GetResult();
  18. Console.ReadLine();
  19. }
  20. static async Task MainAsync()
  21. {
  22. //await AggregateAsync();
  23. //await AllAsync();
  24. //await AnyAsync();
  25. //await AppendAsync();
  26. //await AwaitAsync();
  27. //await BufferTimeHoppingAsync();
  28. //await BufferTimeSlidingAsync();
  29. //await CombineLatestAsync();
  30. //await ConcatAsync();
  31. //await DelayAsync();
  32. //await MergeAsync();
  33. //await PrependAsync();
  34. //await RangeAsync();
  35. //await ReturnAsync();
  36. //await SelectManyAsync();
  37. //await SubjectAsync();
  38. //await TakeUntilAsync();
  39. //await TimerAsync();
  40. }
  41. static async Task AggregateAsync()
  42. {
  43. await AsyncObservable.Range(0, 10).Aggregate(0, (sum, x) => sum + x).SubscribeAsync(Print<int>());
  44. }
  45. static async Task AllAsync()
  46. {
  47. await AsyncObservable.Range(0, 10).All(x => x < 10).SubscribeAsync(Print<bool>());
  48. }
  49. static async Task AnyAsync()
  50. {
  51. await AsyncObservable.Range(0, 10).Any(x => x == 5).SubscribeAsync(Print<bool>());
  52. }
  53. static async Task AppendAsync()
  54. {
  55. await AsyncObservable.Range(0, 10).Append(42).SubscribeAsync(Print<int>());
  56. }
  57. static async Task AwaitAsync()
  58. {
  59. Console.WriteLine(await AsyncObservable.Range(0, 10));
  60. }
  61. static async Task BufferTimeHoppingAsync()
  62. {
  63. await
  64. AsyncObservable
  65. .Interval(TimeSpan.FromMilliseconds(300))
  66. .Buffer(TimeSpan.FromSeconds(1))
  67. .Select(xs => string.Join(", ", xs))
  68. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  69. }
  70. static async Task BufferTimeSlidingAsync()
  71. {
  72. await
  73. AsyncObservable
  74. .Interval(TimeSpan.FromMilliseconds(100))
  75. .Timestamp(TaskPoolAsyncScheduler.Default)
  76. .Buffer(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(300))
  77. .Select(xs => $"[{xs.First().Timestamp}, {xs.Last().Timestamp}] = {(xs.Last().Timestamp - xs.First().Timestamp).TotalMilliseconds}")
  78. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  79. }
  80. static async Task CombineLatestAsync()
  81. {
  82. await
  83. AsyncObservable.CombineLatest(
  84. AsyncObservable.Interval(TimeSpan.FromMilliseconds(250)).Take(10).Timestamp(),
  85. AsyncObservable.Interval(TimeSpan.FromMilliseconds(333)).Take(10).Timestamp(),
  86. (x, y) => x.ToString() + ", " + y.ToString()
  87. )
  88. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  89. }
  90. static async Task ConcatAsync()
  91. {
  92. await
  93. AsyncObservable.Concat(
  94. AsyncObservable.Range(0, 5),
  95. AsyncObservable.Range(5, 5),
  96. AsyncObservable.Range(10, 5),
  97. AsyncObservable.Range(15, 5)
  98. )
  99. .SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  100. }
  101. static async Task DelayAsync()
  102. {
  103. await
  104. AsyncObservable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
  105. .Timestamp()
  106. .Delay(TimeSpan.FromMilliseconds(2500))
  107. .Timestamp()
  108. .Select(x => new TimeInterval<long>(x.Value.Value, x.Timestamp - x.Value.Timestamp).ToString())
  109. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  110. }
  111. static async Task MergeAsync()
  112. {
  113. var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();
  114. var res = subject.Merge();
  115. await res.SubscribeAsync(Print<int>());
  116. for (var i = 1; i <= 10; i++)
  117. {
  118. await subject.OnNextAsync(AsyncObservable.Range(0, i));
  119. }
  120. await subject.OnCompletedAsync();
  121. }
  122. static async Task PrependAsync()
  123. {
  124. await AsyncObservable.Range(0, 10).Prepend(42).SubscribeAsync(Print<int>());
  125. }
  126. static async Task RangeAsync()
  127. {
  128. await AsyncObservable.Range(0, 10).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  129. }
  130. static async Task ReturnAsync()
  131. {
  132. await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
  133. }
  134. static async Task SelectManyAsync()
  135. {
  136. var res = from i in AsyncObservable.Range(0, 10)
  137. from j in AsyncObservable.Range(i * 10, 10)
  138. select i + " -> " + j;
  139. await res.SubscribeAsync(Print<string>());
  140. }
  141. static async Task SubjectAsync()
  142. {
  143. var subject = new SequentialSimpleAsyncSubject<int>();
  144. var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
  145. await res.SubscribeAsync(Print<int>());
  146. for (var i = 0; i < 10; i++)
  147. {
  148. await subject.OnNextAsync(i);
  149. }
  150. await subject.OnCompletedAsync();
  151. }
  152. static async Task TakeUntilAsync()
  153. {
  154. await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  155. }
  156. static async Task TimerAsync()
  157. {
  158. await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
  159. }
  160. static IAsyncObserver<T> Print<T>()
  161. {
  162. return AsyncObserver.Create<T>(
  163. x =>
  164. {
  165. Console.WriteLine(x);
  166. return Task.CompletedTask;
  167. },
  168. ex =>
  169. {
  170. Console.WriteLine("Error: " + ex);
  171. return Task.CompletedTask;
  172. },
  173. () =>
  174. {
  175. Console.WriteLine("Completed");
  176. return Task.CompletedTask;
  177. }
  178. );
  179. }
  180. }
  181. }