| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 | 
							- // Licensed to the .NET Foundation under one or more agreements.
 
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 
- // See the LICENSE file in the project root for more information. 
 
- using System;
 
- using System.Linq;
 
- using System.Reactive;
 
- using System.Reactive.Concurrency;
 
- using System.Reactive.Linq;
 
- using System.Reactive.Subjects;
 
- using System.Threading.Tasks;
 
- namespace Playground
 
- {
 
-     static class Program
 
-     {
 
-         static void Main()
 
-         {
 
-             MainAsync().GetAwaiter().GetResult();
 
-             Console.ReadLine();
 
-         }
 
-         static async Task MainAsync()
 
-         {
 
-             //await AggregateAsync();
 
-             //await AllAsync();
 
-             //await AnyAsync();
 
-             //await AppendAsync();
 
-             //await AwaitAsync();
 
-             //await BufferTimeHoppingAsync();
 
-             //await BufferTimeSlidingAsync();
 
-             //await CombineLatestAsync();
 
-             //await ConcatAsync();
 
-             //await DelayAsync();
 
-             //await GroupByAsync();
 
-             //await GroupBySelectManyAsync();
 
-             //await MergeAsync();
 
-             //await PrependAsync();
 
-             //await RangeAsync();
 
-             //await ReplaySubjectAsync();
 
-             //await ReturnAsync();
 
-             //await SelectManyAsync();
 
-             //await SubjectAsync();
 
-             //await TakeUntilAsync();
 
-             //await TimerAsync();
 
-             //await WhileAsync();
 
-         }
 
-         static async Task AggregateAsync()
 
-         {
 
-             await AsyncObservable.Range(0, 10).Aggregate(0, (sum, x) => sum + x).SubscribeAsync(Print<int>());
 
-         }
 
-         static async Task AllAsync()
 
-         {
 
-             await AsyncObservable.Range(0, 10).All(x => x < 10).SubscribeAsync(Print<bool>());
 
-         }
 
-         static async Task AnyAsync()
 
-         {
 
-             await AsyncObservable.Range(0, 10).Any(x => x == 5).SubscribeAsync(Print<bool>());
 
-         }
 
-         static async Task AppendAsync()
 
-         {
 
-             await AsyncObservable.Range(0, 10).Append(42).SubscribeAsync(Print<int>());
 
-         }
 
-         static async Task AwaitAsync()
 
-         {
 
-             Console.WriteLine(await AsyncObservable.Range(0, 10));
 
-         }
 
-         static async Task BufferTimeHoppingAsync()
 
-         {
 
-             await
 
-                 AsyncObservable
 
-                     .Interval(TimeSpan.FromMilliseconds(300))
 
-                     .Buffer(TimeSpan.FromSeconds(1))
 
-                     .Select(xs => string.Join(", ", xs))
 
-                     .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
 
-         }
 
-         static async Task BufferTimeSlidingAsync()
 
-         {
 
-             await
 
-                 AsyncObservable
 
-                     .Interval(TimeSpan.FromMilliseconds(100))
 
-                     .Timestamp(TaskPoolAsyncScheduler.Default)
 
-                     .Buffer(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(300))
 
-                     .Select(xs => $"[{xs.First().Timestamp}, {xs.Last().Timestamp}] = {(xs.Last().Timestamp - xs.First().Timestamp).TotalMilliseconds}")
 
-                     .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
 
-         }
 
-         static async Task CombineLatestAsync()
 
-         {
 
-             await
 
-                 AsyncObservable.CombineLatest(
 
-                     AsyncObservable.Interval(TimeSpan.FromMilliseconds(250)).Take(10).Timestamp(),
 
-                     AsyncObservable.Interval(TimeSpan.FromMilliseconds(333)).Take(10).Timestamp(),
 
-                     (x, y) => x.ToString() + ", " + y.ToString()
 
-                 )
 
-                 .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
 
-         }
 
-         static async Task ConcatAsync()
 
-         {
 
-             await
 
-                 AsyncObservable.Concat(
 
-                     AsyncObservable.Range(0, 5),
 
-                     AsyncObservable.Range(5, 5),
 
-                     AsyncObservable.Range(10, 5),
 
-                     AsyncObservable.Range(15, 5)
 
-                 )
 
-                 .SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
 
-         }
 
-         static async Task DelayAsync()
 
-         {
 
-             await
 
-                 AsyncObservable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
 
-                     .Timestamp()
 
-                     .Delay(TimeSpan.FromMilliseconds(2500))
 
-                     .Timestamp()
 
-                     .Select(x => new TimeInterval<long>(x.Value.Value, x.Timestamp - x.Value.Timestamp).ToString())
 
-                     .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
 
-         }
 
-         static async Task GroupByAsync()
 
-         {
 
-             await
 
-                 AsyncObservable.Interval(TimeSpan.FromMilliseconds(250))
 
-                     .Timestamp()
 
-                     .Take(20)
 
-                     .GroupBy(t => t.Timestamp.Millisecond / 100)
 
-                     .SubscribeAsync(async g =>
 
-                     {
 
-                         await g.Select(x => g.Key + " - " + x).SubscribeAsync(Print<string>());
 
-                     });
 
-         }
 
-         static async Task GroupBySelectManyAsync()
 
-         {
 
-             await
 
-                 AsyncObservable.Interval(TimeSpan.FromMilliseconds(250))
 
-                     .Timestamp()
 
-                     .Take(20)
 
-                     .GroupBy(t => t.Timestamp.Millisecond / 100)
 
-                     .SelectMany(g => g, (g, x) => g.Key + " - " + x)
 
-                     .SubscribeAsync(Print<string>());
 
-         }
 
-         static async Task MergeAsync()
 
-         {
 
-             var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();
 
-             var res = subject.Merge();
 
-             await res.SubscribeAsync(Print<int>());
 
-             for (var i = 1; i <= 10; i++)
 
-             {
 
-                 await subject.OnNextAsync(AsyncObservable.Range(0, i));
 
-             }
 
-             await subject.OnCompletedAsync();
 
-         }
 
-         static async Task PrependAsync()
 
-         {
 
-             await AsyncObservable.Range(0, 10).Prepend(42).SubscribeAsync(Print<int>());
 
-         }
 
-         static async Task RangeAsync()
 
-         {
 
-             await AsyncObservable.Range(0, 10).SubscribeAsync(PrintAsync<int>()); // TODO: Use ForEachAsync.
 
-         }
 
-         static async Task ReplaySubjectAsync()
 
-         {
 
-             var sub = new SequentialReplayAsyncSubject<int>(5);
 
-             var d1 = await sub.SubscribeAsync(x => Console.WriteLine("1> " + x));
 
-             await sub.OnNextAsync(40);
 
-             await sub.OnNextAsync(41);
 
-             var d2 = await sub.SubscribeAsync(x => Console.WriteLine("2> " + x));
 
-             await sub.OnNextAsync(42);
 
-             await d1.DisposeAsync();
 
-             await sub.OnNextAsync(43);
 
-             var d3 = await sub.SubscribeAsync(x => Console.WriteLine("3> " + x));
 
-             await sub.OnNextAsync(44);
 
-             await sub.OnNextAsync(45);
 
-             await d3.DisposeAsync();
 
-             await sub.OnNextAsync(46);
 
-             await d2.DisposeAsync();
 
-             await sub.OnNextAsync(47);
 
-         }
 
-         static async Task ReturnAsync()
 
-         {
 
-             await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
 
-         }
 
-         static async Task SelectManyAsync()
 
-         {
 
-             var res = from i in AsyncObservable.Range(0, 10)
 
-                       from j in AsyncObservable.Range(i * 10, 10)
 
-                       select i + " -> " + j;
 
-             await res.SubscribeAsync(Print<string>());
 
-         }
 
-         static async Task SubjectAsync()
 
-         {
 
-             var subject = new SequentialSimpleAsyncSubject<int>();
 
-             var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
 
-             await res.SubscribeAsync(Print<int>());
 
-             for (var i = 0; i < 10; i++)
 
-             {
 
-                 await subject.OnNextAsync(i);
 
-             }
 
-             await subject.OnCompletedAsync();
 
-         }
 
-         static async Task TakeUntilAsync()
 
-         {
 
-             await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
 
-         }
 
-         static async Task TimerAsync()
 
-         {
 
-             await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
 
-         }
 
-         static async Task WhileAsync()
 
-         {
 
-             var i = 0;
 
-             await AsyncObservable.While(() => ++i < 5, AsyncObservable.Range(0, 5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
 
-         }
 
-         static IAsyncObserver<T> Print<T>()
 
-         {
 
-             return AsyncObserver.Create<T>(
 
-                 x =>
 
-                 {
 
-                     Console.WriteLine(x);
 
-                     return Task.CompletedTask;
 
-                 },
 
-                 ex =>
 
-                 {
 
-                     Console.WriteLine("Error: " + ex);
 
-                     return Task.CompletedTask;
 
-                 },
 
-                 () =>
 
-                 {
 
-                     Console.WriteLine("Completed");
 
-                     return Task.CompletedTask;
 
-                 }
 
-             );
 
-         }
 
-         static IAsyncObserver<T> PrintAsync<T>()
 
-         {
 
-             return AsyncObserver.Create<T>(
 
-                 async x =>
 
-                 {
 
-                     await Task.Yield();
 
-                     Console.WriteLine(x);
 
-                 },
 
-                 async ex =>
 
-                 {
 
-                     await Task.Yield();
 
-                     Console.WriteLine("Error: " + ex);
 
-                 },
 
-                 async () =>
 
-                 {
 
-                     await Task.Yield();
 
-                     Console.WriteLine("Completed");
 
-                 }
 
-             );
 
-         }
 
-     }
 
- }
 
 
  |