Program.cs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using System.Threading.Tasks;
  10. namespace Playground
  11. {
  12. static class Program
  13. {
  14. static void Main()
  15. {
  16. MainAsync().GetAwaiter().GetResult();
  17. Console.ReadLine();
  18. }
  19. static async Task MainAsync()
  20. {
  21. await AwaitAsync();
  22. await BufferTimeHoppingAsync();
  23. await BufferTimeSlidingAsync();
  24. await ConcatAsync();
  25. await MergeAsync();
  26. await RangeAsync();
  27. await ReturnAsync();
  28. await SelectManyAsync();
  29. await SubjectAsync();
  30. await TakeUntilAsync();
  31. await TimerAsync();
  32. }
  33. static async Task AwaitAsync()
  34. {
  35. Console.WriteLine(await AsyncObservable.Range(0, 10));
  36. }
  37. static async Task BufferTimeHoppingAsync()
  38. {
  39. await
  40. AsyncObservable
  41. .Interval(TimeSpan.FromMilliseconds(300))
  42. .Buffer(TimeSpan.FromSeconds(1))
  43. .Select(xs => string.Join(", ", xs))
  44. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  45. }
  46. static async Task BufferTimeSlidingAsync()
  47. {
  48. await
  49. AsyncObservable
  50. .Interval(TimeSpan.FromMilliseconds(100))
  51. .Timestamp(TaskPoolAsyncScheduler.Default)
  52. .Buffer(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(300))
  53. .Select(xs => $"[{xs.First().Timestamp}, {xs.Last().Timestamp}] = {(xs.Last().Timestamp - xs.First().Timestamp).TotalMilliseconds}")
  54. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  55. }
  56. static async Task ConcatAsync()
  57. {
  58. await
  59. AsyncObservable.Concat(
  60. AsyncObservable.Range(0, 5),
  61. AsyncObservable.Range(5, 5),
  62. AsyncObservable.Range(10, 5),
  63. AsyncObservable.Range(15, 5)
  64. )
  65. .SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  66. }
  67. static async Task MergeAsync()
  68. {
  69. var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();
  70. var res = subject.Merge();
  71. await res.SubscribeAsync(Print<int>());
  72. for (var i = 1; i <= 10; i++)
  73. {
  74. await subject.OnNextAsync(AsyncObservable.Range(0, i));
  75. }
  76. await subject.OnCompletedAsync();
  77. }
  78. static async Task RangeAsync()
  79. {
  80. await AsyncObservable.Range(0, 10).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  81. }
  82. static async Task ReturnAsync()
  83. {
  84. await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
  85. }
  86. static async Task SelectManyAsync()
  87. {
  88. var res = from i in AsyncObservable.Range(0, 10)
  89. from j in AsyncObservable.Range(i * 10, 10)
  90. select i + " -> " + j;
  91. await res.SubscribeAsync(Print<string>());
  92. }
  93. static async Task SubjectAsync()
  94. {
  95. var subject = new SequentialSimpleAsyncSubject<int>();
  96. var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
  97. await res.SubscribeAsync(Print<int>());
  98. for (var i = 0; i < 10; i++)
  99. {
  100. await subject.OnNextAsync(i);
  101. }
  102. await subject.OnCompletedAsync();
  103. }
  104. static async Task TakeUntilAsync()
  105. {
  106. await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  107. }
  108. static async Task TimerAsync()
  109. {
  110. await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
  111. }
  112. static IAsyncObserver<T> Print<T>()
  113. {
  114. return AsyncObserver.Create<T>(
  115. x =>
  116. {
  117. Console.WriteLine(x);
  118. return Task.CompletedTask;
  119. },
  120. ex =>
  121. {
  122. Console.WriteLine("Error: " + ex);
  123. return Task.CompletedTask;
  124. },
  125. () =>
  126. {
  127. Console.WriteLine("Completed");
  128. return Task.CompletedTask;
  129. }
  130. );
  131. }
  132. }
  133. }