Program.cs 8.7 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Reactive.Subjects;
  10. using System.Threading.Tasks;
  11. namespace Playground
  12. {
  13. static class Program
  14. {
  15. static void Main()
  16. {
  17. MainAsync().GetAwaiter().GetResult();
  18. Console.ReadLine();
  19. }
  20. static async Task MainAsync()
  21. {
  22. //await AggregateAsync();
  23. //await AllAsync();
  24. //await AnyAsync();
  25. //await AppendAsync();
  26. //await AwaitAsync();
  27. //await BufferTimeHoppingAsync();
  28. //await BufferTimeSlidingAsync();
  29. //await CombineLatestAsync();
  30. //await ConcatAsync();
  31. //await DelayAsync();
  32. //await GroupByAsync();
  33. //await GroupBySelectManyAsync();
  34. //await MergeAsync();
  35. //await PrependAsync();
  36. //await RangeAsync();
  37. //await ReplaySubjectAsync();
  38. //await ReturnAsync();
  39. //await SelectManyAsync();
  40. //await SubjectAsync();
  41. //await TakeUntilAsync();
  42. //await TimerAsync();
  43. }
  44. static async Task AggregateAsync()
  45. {
  46. await AsyncObservable.Range(0, 10).Aggregate(0, (sum, x) => sum + x).SubscribeAsync(Print<int>());
  47. }
  48. static async Task AllAsync()
  49. {
  50. await AsyncObservable.Range(0, 10).All(x => x < 10).SubscribeAsync(Print<bool>());
  51. }
  52. static async Task AnyAsync()
  53. {
  54. await AsyncObservable.Range(0, 10).Any(x => x == 5).SubscribeAsync(Print<bool>());
  55. }
  56. static async Task AppendAsync()
  57. {
  58. await AsyncObservable.Range(0, 10).Append(42).SubscribeAsync(Print<int>());
  59. }
  60. static async Task AwaitAsync()
  61. {
  62. Console.WriteLine(await AsyncObservable.Range(0, 10));
  63. }
  64. static async Task BufferTimeHoppingAsync()
  65. {
  66. await
  67. AsyncObservable
  68. .Interval(TimeSpan.FromMilliseconds(300))
  69. .Buffer(TimeSpan.FromSeconds(1))
  70. .Select(xs => string.Join(", ", xs))
  71. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  72. }
  73. static async Task BufferTimeSlidingAsync()
  74. {
  75. await
  76. AsyncObservable
  77. .Interval(TimeSpan.FromMilliseconds(100))
  78. .Timestamp(TaskPoolAsyncScheduler.Default)
  79. .Buffer(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(300))
  80. .Select(xs => $"[{xs.First().Timestamp}, {xs.Last().Timestamp}] = {(xs.Last().Timestamp - xs.First().Timestamp).TotalMilliseconds}")
  81. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  82. }
  83. static async Task CombineLatestAsync()
  84. {
  85. await
  86. AsyncObservable.CombineLatest(
  87. AsyncObservable.Interval(TimeSpan.FromMilliseconds(250)).Take(10).Timestamp(),
  88. AsyncObservable.Interval(TimeSpan.FromMilliseconds(333)).Take(10).Timestamp(),
  89. (x, y) => x.ToString() + ", " + y.ToString()
  90. )
  91. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  92. }
  93. static async Task ConcatAsync()
  94. {
  95. await
  96. AsyncObservable.Concat(
  97. AsyncObservable.Range(0, 5),
  98. AsyncObservable.Range(5, 5),
  99. AsyncObservable.Range(10, 5),
  100. AsyncObservable.Range(15, 5)
  101. )
  102. .SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  103. }
  104. static async Task DelayAsync()
  105. {
  106. await
  107. AsyncObservable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
  108. .Timestamp()
  109. .Delay(TimeSpan.FromMilliseconds(2500))
  110. .Timestamp()
  111. .Select(x => new TimeInterval<long>(x.Value.Value, x.Timestamp - x.Value.Timestamp).ToString())
  112. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  113. }
  114. static async Task GroupByAsync()
  115. {
  116. await
  117. AsyncObservable.Interval(TimeSpan.FromMilliseconds(250))
  118. .Timestamp()
  119. .Take(20)
  120. .GroupBy(t => t.Timestamp.Millisecond / 100)
  121. .SubscribeAsync(async g =>
  122. {
  123. await g.Select(x => g.Key + " - " + x).SubscribeAsync(Print<string>());
  124. });
  125. }
  126. static async Task GroupBySelectManyAsync()
  127. {
  128. await
  129. AsyncObservable.Interval(TimeSpan.FromMilliseconds(250))
  130. .Timestamp()
  131. .Take(20)
  132. .GroupBy(t => t.Timestamp.Millisecond / 100)
  133. .SelectMany(g => g, (g, x) => g.Key + " - " + x)
  134. .SubscribeAsync(Print<string>());
  135. }
  136. static async Task MergeAsync()
  137. {
  138. var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();
  139. var res = subject.Merge();
  140. await res.SubscribeAsync(Print<int>());
  141. for (var i = 1; i <= 10; i++)
  142. {
  143. await subject.OnNextAsync(AsyncObservable.Range(0, i));
  144. }
  145. await subject.OnCompletedAsync();
  146. }
  147. static async Task PrependAsync()
  148. {
  149. await AsyncObservable.Range(0, 10).Prepend(42).SubscribeAsync(Print<int>());
  150. }
  151. static async Task RangeAsync()
  152. {
  153. await AsyncObservable.Range(0, 10).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  154. }
  155. static async Task ReplaySubjectAsync()
  156. {
  157. var sub = new SequentialReplayAsyncSubject<int>(5);
  158. var d1 = await sub.SubscribeAsync(async x => Console.WriteLine("1> " + x));
  159. await sub.OnNextAsync(40);
  160. await sub.OnNextAsync(41);
  161. var d2 = await sub.SubscribeAsync(async x => Console.WriteLine("2> " + x));
  162. await sub.OnNextAsync(42);
  163. await d1.DisposeAsync();
  164. await sub.OnNextAsync(43);
  165. var d3 = await sub.SubscribeAsync(async x => Console.WriteLine("3> " + x));
  166. await sub.OnNextAsync(44);
  167. await sub.OnNextAsync(45);
  168. await d3.DisposeAsync();
  169. await sub.OnNextAsync(46);
  170. await d2.DisposeAsync();
  171. await sub.OnNextAsync(47);
  172. }
  173. static async Task ReturnAsync()
  174. {
  175. await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
  176. }
  177. static async Task SelectManyAsync()
  178. {
  179. var res = from i in AsyncObservable.Range(0, 10)
  180. from j in AsyncObservable.Range(i * 10, 10)
  181. select i + " -> " + j;
  182. await res.SubscribeAsync(Print<string>());
  183. }
  184. static async Task SubjectAsync()
  185. {
  186. var subject = new SequentialSimpleAsyncSubject<int>();
  187. var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
  188. await res.SubscribeAsync(Print<int>());
  189. for (var i = 0; i < 10; i++)
  190. {
  191. await subject.OnNextAsync(i);
  192. }
  193. await subject.OnCompletedAsync();
  194. }
  195. static async Task TakeUntilAsync()
  196. {
  197. await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  198. }
  199. static async Task TimerAsync()
  200. {
  201. await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
  202. }
  203. static IAsyncObserver<T> Print<T>()
  204. {
  205. return AsyncObserver.Create<T>(
  206. x =>
  207. {
  208. Console.WriteLine(x);
  209. return Task.CompletedTask;
  210. },
  211. ex =>
  212. {
  213. Console.WriteLine("Error: " + ex);
  214. return Task.CompletedTask;
  215. },
  216. () =>
  217. {
  218. Console.WriteLine("Completed");
  219. return Task.CompletedTask;
  220. }
  221. );
  222. }
  223. }
  224. }