Program.cs 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. using System;
  2. using System.Reactive.Linq;
  3. using System.Reactive.Subjects;
  4. using System.Threading.Tasks;
  5. namespace Playground
  6. {
  7. static class Program
  8. {
  9. static void Main()
  10. {
  11. MainAsync().GetAwaiter().GetResult();
  12. Console.ReadLine();
  13. }
  14. static async Task MainAsync()
  15. {
  16. await BufferAsync();
  17. await MergeAsync();
  18. await RangeAsync();
  19. await ReturnAsync();
  20. await SubjectAsync();
  21. await TakeUntilAsync();
  22. await TimerAsync();
  23. }
  24. static async Task BufferAsync()
  25. {
  26. await AsyncObservable.Interval(TimeSpan.FromMilliseconds(300)).Buffer(TimeSpan.FromSeconds(1)).Select(xs => string.Join(", ", xs)).SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
  27. }
  28. static async Task MergeAsync()
  29. {
  30. var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();
  31. var res = subject.Merge();
  32. await res.SubscribeAsync(Print<int>());
  33. for (var i = 1; i <= 10; i++)
  34. {
  35. await subject.OnNextAsync(AsyncObservable.Range(0, i));
  36. }
  37. await subject.OnCompletedAsync();
  38. }
  39. static async Task RangeAsync()
  40. {
  41. await AsyncObservable.Range(0, 10).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  42. }
  43. static async Task ReturnAsync()
  44. {
  45. await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
  46. }
  47. static async Task SubjectAsync()
  48. {
  49. var subject = new SequentialSimpleAsyncSubject<int>();
  50. var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
  51. await res.SubscribeAsync(Print<int>());
  52. for (var i = 0; i < 10; i++)
  53. {
  54. await subject.OnNextAsync(i);
  55. }
  56. await subject.OnCompletedAsync();
  57. }
  58. static async Task TakeUntilAsync()
  59. {
  60. await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
  61. }
  62. static async Task TimerAsync()
  63. {
  64. await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
  65. }
  66. static IAsyncObserver<T> Print<T>()
  67. {
  68. return AsyncObserver.Create<T>(
  69. x =>
  70. {
  71. Console.WriteLine(x);
  72. return Task.CompletedTask;
  73. },
  74. ex =>
  75. {
  76. Console.WriteLine("Error: " + ex);
  77. return Task.CompletedTask;
  78. },
  79. () =>
  80. {
  81. Console.WriteLine("Completed");
  82. return Task.CompletedTask;
  83. }
  84. );
  85. }
  86. }
  87. }