Browse Source

Adding some overloads for Min and Max.

Bart De Smet 8 years ago
parent
commit
9d5297fa26

+ 84 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Max.cs

@@ -2,12 +2,96 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Collections.Generic;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
+    partial class AsyncObservable
+    {
+        public static IAsyncObservable<TSource> Max<TSource>(IAsyncObservable<TSource> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.Max(observer)));
+        }
+
+        public static IAsyncObservable<TSource> Max<TSource>(IAsyncObservable<TSource> source, IComparer<TSource> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.Max(observer, comparer)));
+        }
+    }
+
     partial class AsyncObserver
     {
+        public static IAsyncObserver<TSource> Max<TSource>(IAsyncObserver<TSource> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            return Max(observer, Comparer<TSource>.Default);
+        }
+
+        public static IAsyncObserver<TSource> Max<TSource>(IAsyncObserver<TSource> observer, IComparer<TSource> comparer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+            var max = default(TSource);
+            var found = false;
+
+            return Create<TSource>(
+                async x =>
+                {
+                    if (found)
+                    {
+                        bool isGreater;
+
+                        try
+                        {
+                            isGreater = comparer.Compare(x, max) > 0;
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        if (isGreater)
+                        {
+                            max = x;
+                        }
+                    }
+                    else
+                    {
+                        max = x;
+                        found = true;
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(max).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
         public static IAsyncObserver<int> MaxInt32(IAsyncObserver<int> observer)
         {
             if (observer == null)

+ 84 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Min.cs

@@ -2,12 +2,96 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Collections.Generic;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
+    partial class AsyncObservable
+    {
+        public static IAsyncObservable<TSource> Min<TSource>(IAsyncObservable<TSource> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.Min(observer)));
+        }
+
+        public static IAsyncObservable<TSource> Min<TSource>(IAsyncObservable<TSource> source, IComparer<TSource> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.Min(observer, comparer)));
+        }
+    }
+
     partial class AsyncObserver
     {
+        public static IAsyncObserver<TSource> Min<TSource>(IAsyncObserver<TSource> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            return Min(observer, Comparer<TSource>.Default);
+        }
+
+        public static IAsyncObserver<TSource> Min<TSource>(IAsyncObserver<TSource> observer, IComparer<TSource> comparer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+            var min = default(TSource);
+            var found = false;
+
+            return Create<TSource>(
+                async x =>
+                {
+                    if (found)
+                    {
+                        bool isGreater;
+
+                        try
+                        {
+                            isGreater = comparer.Compare(x, min) < 0;
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        if (isGreater)
+                        {
+                            min = x;
+                        }
+                    }
+                    else
+                    {
+                        min = x;
+                        found = true;
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(min).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
         public static IAsyncObserver<int> MinInt32(this IAsyncObserver<int> observer)
         {
             if (observer == null)