Program.cs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. using System;
  2. using System.Linq;
  3. using System.Reactive.Concurrency;
  4. using System.Reactive.Linq;
  5. using System.Reactive.Subjects;
  6. using System.Threading.Tasks;
  7. namespace Playground
  8. {
  9. static class Program
  10. {
  11. static void Main()
  12. {
  13. MainAsync().GetAwaiter().GetResult();
  14. Console.ReadLine();
  15. }
  16. static async Task MainAsync()
  17. {
  18. await BufferTimeHoppingAsync();
  19. await BufferTimeSlidingAsync();
  20. await MergeAsync();
  21. await RangeAsync();
  22. await ReturnAsync();
  23. await SelectManyAsync();
  24. await SubjectAsync();
  25. await TakeUntilAsync();
  26. await TimerAsync();
  27. }
  28. static async Task BufferTimeHoppingAsync()
  29. {
  30. await
  31. AsyncObservable
  32. .Interval(TimeSpan.FromMilliseconds(300))
  33. .Buffer(TimeSpan.FromSeconds(1))
  34. .Select(xs => string.Join(", ", xs))
  35. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  36. }
  37. static async Task BufferTimeSlidingAsync()
  38. {
  39. await
  40. AsyncObservable
  41. .Interval(TimeSpan.FromMilliseconds(100))
  42. .Timestamp(TaskPoolAsyncScheduler.Default)
  43. .Buffer(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(300))
  44. .Select(xs => $"[{xs.First().Timestamp}, {xs.Last().Timestamp}] = {(xs.Last().Timestamp - xs.First().Timestamp).TotalMilliseconds}")
  45. .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  46. }
  47. static async Task MergeAsync()
  48. {
  49. var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();
  50. var res = subject.Merge();
  51. await res.SubscribeAsync(Print<int>());
  52. for (var i = 1; i <= 10; i++)
  53. {
  54. await subject.OnNextAsync(AsyncObservable.Range(0, i));
  55. }
  56. await subject.OnCompletedAsync();
  57. }
  58. static async Task RangeAsync()
  59. {
  60. await AsyncObservable.Range(0, 10).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  61. }
  62. static async Task ReturnAsync()
  63. {
  64. await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
  65. }
  66. static async Task SelectManyAsync()
  67. {
  68. var res = from i in AsyncObservable.Range(0, 10)
  69. from j in AsyncObservable.Range(i * 10, 10)
  70. select i + " -> " + j;
  71. await res.SubscribeAsync(Print<string>());
  72. }
  73. static async Task SubjectAsync()
  74. {
  75. var subject = new SequentialSimpleAsyncSubject<int>();
  76. var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
  77. await res.SubscribeAsync(Print<int>());
  78. for (var i = 0; i < 10; i++)
  79. {
  80. await subject.OnNextAsync(i);
  81. }
  82. await subject.OnCompletedAsync();
  83. }
  84. static async Task TakeUntilAsync()
  85. {
  86. await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  87. }
  88. static async Task TimerAsync()
  89. {
  90. await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
  91. }
  92. static IAsyncObserver<T> Print<T>()
  93. {
  94. return AsyncObserver.Create<T>(
  95. x =>
  96. {
  97. Console.WriteLine(x);
  98. return Task.CompletedTask;
  99. },
  100. ex =>
  101. {
  102. Console.WriteLine("Error: " + ex);
  103. return Task.CompletedTask;
  104. },
  105. () =>
  106. {
  107. Console.WriteLine("Completed");
  108. return Task.CompletedTask;
  109. }
  110. );
  111. }
  112. }
  113. }