Program.cs 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. // The intention is that people will uncomment whichever method call in Main they want to try.
  5. // The following suppressions prevent warnings due to 'unused' members, and the fact that all of
  6. // the await statements in Main are commented out to start with
  7. #pragma warning disable IDE0051, CS1998
  8. using System;
  9. using System.Linq;
  10. using System.Reactive;
  11. using System.Reactive.Concurrency;
  12. using System.Reactive.Linq;
  13. using System.Reactive.Subjects;
  14. using System.Threading.Tasks;
  15. namespace Playground
  16. {
  17. internal static class Program
  18. {
  19. private static async Task Main()
  20. {
  21. //await AggregateAsync();
  22. //await AllAsync();
  23. //await AnyAsync();
  24. //await AppendAsync();
  25. //await AwaitAsync();
  26. //await BufferTimeHoppingAsync();
  27. //await BufferTimeSlidingAsync();
  28. //await CombineLatestAsync();
  29. //await ConcatAsync();
  30. //await DelayAsync();
  31. //await GroupByAsync();
  32. //await GroupBySelectManyAsync();
  33. //await MergeAsync();
  34. //await PrependAsync();
  35. //await RangeAsync();
  36. //await ReplaySubjectAsync();
  37. //await ReturnAsync();
  38. //await SelectManyAsync();
  39. //await SubjectAsync();
  40. //await TakeUntilAsync();
  41. //await TimerAsync();
  42. //await WhileAsync();
  43. Console.ReadLine();
  44. }
  45. private static async Task AggregateAsync()
  46. {
  47. await AsyncObservable.Range(0, 10).Aggregate(0, (sum, x) => sum + x).SubscribeAsync(Print<int>());
  48. }
  49. private static async Task AllAsync()
  50. {
  51. await AsyncObservable.Range(0, 10).All(x => x < 10).SubscribeAsync(Print<bool>());
  52. }
  53. private static async Task AnyAsync()
  54. {
  55. await AsyncObservable.Range(0, 10).Any(x => x == 5).SubscribeAsync(Print<bool>());
  56. }
  57. private static async Task AppendAsync()
  58. {
  59. await AsyncObservable.Range(0, 10).Append(42).SubscribeAsync(Print<int>());
  60. }
  61. private static async Task AwaitAsync()
  62. {
  63. Console.WriteLine(await AsyncObservable.Range(0, 10));
  64. }
  65. private static async Task BufferTimeHoppingAsync()
  66. {
  67. await
  68. AsyncObservable
  69. .Interval(TimeSpan.FromMilliseconds(300))
  70. .Buffer(TimeSpan.FromSeconds(1))
  71. .Select(xs => string.Join(", ", xs))
  72. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  73. }
  74. private static async Task BufferTimeSlidingAsync()
  75. {
  76. await
  77. AsyncObservable
  78. .Interval(TimeSpan.FromMilliseconds(100))
  79. .Timestamp(TaskPoolAsyncScheduler.Default)
  80. .Buffer(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(300))
  81. .Select(xs => $"[{xs.First().Timestamp}, {xs.Last().Timestamp}] = {(xs.Last().Timestamp - xs.First().Timestamp).TotalMilliseconds}")
  82. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  83. }
  84. private static async Task CombineLatestAsync()
  85. {
  86. await
  87. AsyncObservable.CombineLatest(
  88. AsyncObservable.Interval(TimeSpan.FromMilliseconds(250)).Take(10).Timestamp(),
  89. AsyncObservable.Interval(TimeSpan.FromMilliseconds(333)).Take(10).Timestamp(),
  90. (x, y) => x.ToString() + ", " + y.ToString()
  91. )
  92. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  93. }
  94. private static async Task ConcatAsync()
  95. {
  96. await
  97. AsyncObservable.Concat(
  98. AsyncObservable.Range(0, 5),
  99. AsyncObservable.Range(5, 5),
  100. AsyncObservable.Range(10, 5),
  101. AsyncObservable.Range(15, 5)
  102. )
  103. .SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  104. }
  105. private static async Task DelayAsync()
  106. {
  107. await
  108. AsyncObservable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
  109. .Timestamp()
  110. .Delay(TimeSpan.FromMilliseconds(2500))
  111. .Timestamp()
  112. .Select(x => new TimeInterval<long>(x.Value.Value, x.Timestamp - x.Value.Timestamp).ToString())
  113. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  114. }
  115. private static async Task GroupByAsync()
  116. {
  117. await
  118. AsyncObservable.Interval(TimeSpan.FromMilliseconds(250))
  119. .Timestamp()
  120. .Take(20)
  121. .GroupBy(t => t.Timestamp.Millisecond / 100)
  122. .SubscribeAsync(async g =>
  123. {
  124. await g.Select(x => g.Key + " - " + x).SubscribeAsync(Print<string>());
  125. });
  126. }
  127. private static async Task GroupBySelectManyAsync()
  128. {
  129. await
  130. AsyncObservable.Interval(TimeSpan.FromMilliseconds(250))
  131. .Timestamp()
  132. .Take(20)
  133. .GroupBy(t => t.Timestamp.Millisecond / 100)
  134. .SelectMany(g => g, (g, x) => g.Key + " - " + x)
  135. .SubscribeAsync(Print<string>());
  136. }
  137. private static async Task MergeAsync()
  138. {
  139. var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();
  140. var res = subject.Merge();
  141. await res.SubscribeAsync(Print<int>());
  142. for (var i = 1; i <= 10; i++)
  143. {
  144. await subject.OnNextAsync(AsyncObservable.Range(0, i));
  145. }
  146. await subject.OnCompletedAsync();
  147. }
  148. private static async Task PrependAsync()
  149. {
  150. await AsyncObservable.Range(0, 10).Prepend(42).SubscribeAsync(Print<int>());
  151. }
  152. private static async Task RangeAsync()
  153. {
  154. await AsyncObservable.Range(0, 10).SubscribeAsync(PrintAsync<int>()); // TODO: Use ForEachAsync.
  155. }
  156. private static async Task ReplaySubjectAsync()
  157. {
  158. var sub = new SequentialReplayAsyncSubject<int>(5);
  159. var d1 = await sub.SubscribeAsync(x => Console.WriteLine("1> " + x));
  160. await sub.OnNextAsync(40);
  161. await sub.OnNextAsync(41);
  162. var d2 = await sub.SubscribeAsync(x => Console.WriteLine("2> " + x));
  163. await sub.OnNextAsync(42);
  164. await d1.DisposeAsync();
  165. await sub.OnNextAsync(43);
  166. var d3 = await sub.SubscribeAsync(x => Console.WriteLine("3> " + x));
  167. await sub.OnNextAsync(44);
  168. await sub.OnNextAsync(45);
  169. await d3.DisposeAsync();
  170. await sub.OnNextAsync(46);
  171. await d2.DisposeAsync();
  172. await sub.OnNextAsync(47);
  173. }
  174. private static async Task ReturnAsync()
  175. {
  176. await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
  177. }
  178. private static async Task SelectManyAsync()
  179. {
  180. var res = from i in AsyncObservable.Range(0, 10)
  181. from j in AsyncObservable.Range(i * 10, 10)
  182. select i + " -> " + j;
  183. await res.SubscribeAsync(Print<string>());
  184. }
  185. private static async Task SubjectAsync()
  186. {
  187. var subject = new SequentialSimpleAsyncSubject<int>();
  188. var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
  189. await res.SubscribeAsync(Print<int>());
  190. for (var i = 0; i < 10; i++)
  191. {
  192. await subject.OnNextAsync(i);
  193. }
  194. await subject.OnCompletedAsync();
  195. }
  196. private static async Task TakeUntilAsync()
  197. {
  198. await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  199. }
  200. private static async Task TimerAsync()
  201. {
  202. await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
  203. }
  204. private static async Task WhileAsync()
  205. {
  206. var i = 0;
  207. await AsyncObservable.While(() => ++i < 5, AsyncObservable.Range(0, 5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  208. }
  209. private static IAsyncObserver<T> Print<T>()
  210. {
  211. return AsyncObserver.Create<T>(
  212. x =>
  213. {
  214. Console.WriteLine(x);
  215. return default;
  216. },
  217. ex =>
  218. {
  219. Console.WriteLine("Error: " + ex);
  220. return default;
  221. },
  222. () =>
  223. {
  224. Console.WriteLine("Completed");
  225. return default;
  226. }
  227. );
  228. }
  229. private static IAsyncObserver<T> PrintAsync<T>()
  230. {
  231. return AsyncObserver.Create<T>(
  232. async x =>
  233. {
  234. await Task.Yield();
  235. Console.WriteLine(x);
  236. },
  237. async ex =>
  238. {
  239. await Task.Yield();
  240. Console.WriteLine("Error: " + ex);
  241. },
  242. async () =>
  243. {
  244. await Task.Yield();
  245. Console.WriteLine("Completed");
  246. }
  247. );
  248. }
  249. }
  250. }