Program.cs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using System.Threading.Tasks;
  10. namespace Playground
  11. {
  12. static class Program
  13. {
  14. static void Main()
  15. {
  16. MainAsync().GetAwaiter().GetResult();
  17. Console.ReadLine();
  18. }
  19. static async Task MainAsync()
  20. {
  21. await BufferTimeHoppingAsync();
  22. await BufferTimeSlidingAsync();
  23. await MergeAsync();
  24. await RangeAsync();
  25. await ReturnAsync();
  26. await SelectManyAsync();
  27. await SubjectAsync();
  28. await TakeUntilAsync();
  29. await TimerAsync();
  30. }
  31. static async Task BufferTimeHoppingAsync()
  32. {
  33. await
  34. AsyncObservable
  35. .Interval(TimeSpan.FromMilliseconds(300))
  36. .Buffer(TimeSpan.FromSeconds(1))
  37. .Select(xs => string.Join(", ", xs))
  38. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  39. }
  40. static async Task BufferTimeSlidingAsync()
  41. {
  42. await
  43. AsyncObservable
  44. .Interval(TimeSpan.FromMilliseconds(100))
  45. .Timestamp(TaskPoolAsyncScheduler.Default)
  46. .Buffer(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(300))
  47. .Select(xs => $"[{xs.First().Timestamp}, {xs.Last().Timestamp}] = {(xs.Last().Timestamp - xs.First().Timestamp).TotalMilliseconds}")
  48. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  49. }
  50. static async Task MergeAsync()
  51. {
  52. var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();
  53. var res = subject.Merge();
  54. await res.SubscribeAsync(Print<int>());
  55. for (var i = 1; i <= 10; i++)
  56. {
  57. await subject.OnNextAsync(AsyncObservable.Range(0, i));
  58. }
  59. await subject.OnCompletedAsync();
  60. }
  61. static async Task RangeAsync()
  62. {
  63. await AsyncObservable.Range(0, 10).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  64. }
  65. static async Task ReturnAsync()
  66. {
  67. await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
  68. }
  69. static async Task SelectManyAsync()
  70. {
  71. var res = from i in AsyncObservable.Range(0, 10)
  72. from j in AsyncObservable.Range(i * 10, 10)
  73. select i + " -> " + j;
  74. await res.SubscribeAsync(Print<string>());
  75. }
  76. static async Task SubjectAsync()
  77. {
  78. var subject = new SequentialSimpleAsyncSubject<int>();
  79. var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
  80. await res.SubscribeAsync(Print<int>());
  81. for (var i = 0; i < 10; i++)
  82. {
  83. await subject.OnNextAsync(i);
  84. }
  85. await subject.OnCompletedAsync();
  86. }
  87. static async Task TakeUntilAsync()
  88. {
  89. await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  90. }
  91. static async Task TimerAsync()
  92. {
  93. await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
  94. }
  95. static IAsyncObserver<T> Print<T>()
  96. {
  97. return AsyncObserver.Create<T>(
  98. x =>
  99. {
  100. Console.WriteLine(x);
  101. return Task.CompletedTask;
  102. },
  103. ex =>
  104. {
  105. Console.WriteLine("Error: " + ex);
  106. return Task.CompletedTask;
  107. },
  108. () =>
  109. {
  110. Console.WriteLine("Completed");
  111. return Task.CompletedTask;
  112. }
  113. );
  114. }
  115. }
  116. }