Program.cs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Reactive.Subjects;
  10. using System.Threading.Tasks;
  11. namespace Playground
  12. {
  13. static class Program
  14. {
  15. static void Main()
  16. {
  17. MainAsync().GetAwaiter().GetResult();
  18. Console.ReadLine();
  19. }
  20. static async Task MainAsync()
  21. {
  22. //await AwaitAsync();
  23. //await BufferTimeHoppingAsync();
  24. //await BufferTimeSlidingAsync();
  25. //await ConcatAsync();
  26. await DelayAsync();
  27. //await MergeAsync();
  28. //await RangeAsync();
  29. //await ReturnAsync();
  30. //await SelectManyAsync();
  31. //await SubjectAsync();
  32. //await TakeUntilAsync();
  33. //await TimerAsync();
  34. }
  35. static async Task AwaitAsync()
  36. {
  37. Console.WriteLine(await AsyncObservable.Range(0, 10));
  38. }
  39. static async Task BufferTimeHoppingAsync()
  40. {
  41. await
  42. AsyncObservable
  43. .Interval(TimeSpan.FromMilliseconds(300))
  44. .Buffer(TimeSpan.FromSeconds(1))
  45. .Select(xs => string.Join(", ", xs))
  46. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  47. }
  48. static async Task BufferTimeSlidingAsync()
  49. {
  50. await
  51. AsyncObservable
  52. .Interval(TimeSpan.FromMilliseconds(100))
  53. .Timestamp(TaskPoolAsyncScheduler.Default)
  54. .Buffer(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(300))
  55. .Select(xs => $"[{xs.First().Timestamp}, {xs.Last().Timestamp}] = {(xs.Last().Timestamp - xs.First().Timestamp).TotalMilliseconds}")
  56. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  57. }
  58. static async Task ConcatAsync()
  59. {
  60. await
  61. AsyncObservable.Concat(
  62. AsyncObservable.Range(0, 5),
  63. AsyncObservable.Range(5, 5),
  64. AsyncObservable.Range(10, 5),
  65. AsyncObservable.Range(15, 5)
  66. )
  67. .SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  68. }
  69. static async Task DelayAsync()
  70. {
  71. await
  72. AsyncObservable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
  73. .Timestamp()
  74. .Delay(TimeSpan.FromMilliseconds(2500))
  75. .Timestamp()
  76. .Select(x => new TimeInterval<long>(x.Value.Value, x.Timestamp - x.Value.Timestamp).ToString())
  77. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  78. }
  79. static async Task MergeAsync()
  80. {
  81. var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();
  82. var res = subject.Merge();
  83. await res.SubscribeAsync(Print<int>());
  84. for (var i = 1; i <= 10; i++)
  85. {
  86. await subject.OnNextAsync(AsyncObservable.Range(0, i));
  87. }
  88. await subject.OnCompletedAsync();
  89. }
  90. static async Task RangeAsync()
  91. {
  92. await AsyncObservable.Range(0, 10).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  93. }
  94. static async Task ReturnAsync()
  95. {
  96. await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
  97. }
  98. static async Task SelectManyAsync()
  99. {
  100. var res = from i in AsyncObservable.Range(0, 10)
  101. from j in AsyncObservable.Range(i * 10, 10)
  102. select i + " -> " + j;
  103. await res.SubscribeAsync(Print<string>());
  104. }
  105. static async Task SubjectAsync()
  106. {
  107. var subject = new SequentialSimpleAsyncSubject<int>();
  108. var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
  109. await res.SubscribeAsync(Print<int>());
  110. for (var i = 0; i < 10; i++)
  111. {
  112. await subject.OnNextAsync(i);
  113. }
  114. await subject.OnCompletedAsync();
  115. }
  116. static async Task TakeUntilAsync()
  117. {
  118. await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  119. }
  120. static async Task TimerAsync()
  121. {
  122. await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
  123. }
  124. static IAsyncObserver<T> Print<T>()
  125. {
  126. return AsyncObserver.Create<T>(
  127. x =>
  128. {
  129. Console.WriteLine(x);
  130. return Task.CompletedTask;
  131. },
  132. ex =>
  133. {
  134. Console.WriteLine("Error: " + ex);
  135. return Task.CompletedTask;
  136. },
  137. () =>
  138. {
  139. Console.WriteLine("Completed");
  140. return Task.CompletedTask;
  141. }
  142. );
  143. }
  144. }
  145. }