1
0
Bart De Smet 8 жил өмнө
parent
commit
f0cf4d5a5d

+ 73 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Internal/Lookup.cs

@@ -0,0 +1,73 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace System.Reactive
+{
+    internal sealed class Lookup<K, E> : ILookup<K, E>
+    {
+        private readonly Dictionary<K, List<E>> d;
+
+        public Lookup(IEqualityComparer<K> comparer)
+        {
+            d = new Dictionary<K, List<E>>(comparer);
+        }
+
+        public void Add(K key, E element)
+        {
+            if (!d.TryGetValue(key, out var list))
+                d[key] = list = new List<E>();
+
+            list.Add(element);
+        }
+
+        public bool Contains(K key) => d.ContainsKey(key);
+
+        public int Count => d.Count;
+
+        public IEnumerable<E> this[K key]
+        {
+            get
+            {
+                if (!d.TryGetValue(key, out var list))
+                    return Enumerable.Empty<E>();
+
+                return Hide(list);
+            }
+        }
+
+        private IEnumerable<E> Hide(List<E> elements)
+        {
+            foreach (var x in elements)
+                yield return x;
+        }
+
+        public IEnumerator<IGrouping<K, E>> GetEnumerator()
+        {
+            foreach (var kv in d)
+                yield return new Grouping(kv);
+        }
+
+        private sealed class Grouping : IGrouping<K, E>
+        {
+            private readonly KeyValuePair<K, List<E>> kv;
+
+            public Grouping(KeyValuePair<K, List<E>> kv)
+            {
+                this.kv = kv;
+            }
+
+            public K Key => kv.Key;
+
+            public IEnumerator<E> GetEnumerator() => kv.Value.GetEnumerator();
+
+            IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+        }
+
+        IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+    }
+}

+ 146 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToLookup.cs

@@ -0,0 +1,146 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+        public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (valueSelector == null)
+                throw new ArgumentNullException(nameof(valueSelector));
+
+            return Create<ILookup<TKey, TValue>>(observer => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, keySelector, valueSelector)));
+        }
+
+        public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (valueSelector == null)
+                throw new ArgumentNullException(nameof(valueSelector));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+            return Create<ILookup<TKey, TValue>>(observer => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, keySelector, valueSelector, comparer)));
+        }
+
+        public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TValue>> valueSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (valueSelector == null)
+                throw new ArgumentNullException(nameof(valueSelector));
+
+            return Create<ILookup<TKey, TValue>>(observer => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, keySelector, valueSelector)));
+        }
+
+        public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TValue>> valueSelector, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (valueSelector == null)
+                throw new ArgumentNullException(nameof(valueSelector));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+            return Create<ILookup<TKey, TValue>>(observer => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, keySelector, valueSelector, comparer)));
+        }
+    }
+
+    partial class AsyncObserver
+    {
+        public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (valueSelector == null)
+                throw new ArgumentNullException(nameof(valueSelector));
+
+            return ToLookup(observer, keySelector, valueSelector, EqualityComparer<TKey>.Default);
+        }
+
+        public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector, IEqualityComparer<TKey> comparer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (valueSelector == null)
+                throw new ArgumentNullException(nameof(valueSelector));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+            return Aggregate<TSource, Lookup<TKey, TValue>, ILookup<TKey, TValue>>(
+                observer,
+                new Lookup<TKey, TValue>(comparer),
+                (d, x) =>
+                {
+                    var key = keySelector(x);
+                    var value = valueSelector(x);
+
+                    d.Add(key, value);
+
+                    return d;
+                },
+                d => d
+            );
+        }
+
+        public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TValue>> valueSelector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (valueSelector == null)
+                throw new ArgumentNullException(nameof(valueSelector));
+
+            return ToLookup(observer, keySelector, valueSelector, EqualityComparer<TKey>.Default);
+        }
+
+        public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TValue>> valueSelector, IEqualityComparer<TKey> comparer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (valueSelector == null)
+                throw new ArgumentNullException(nameof(valueSelector));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+            return Aggregate<TSource, Lookup<TKey, TValue>, ILookup<TKey, TValue>>(
+                observer,
+                new Lookup<TKey, TValue>(comparer),
+                async (d, x) =>
+                {
+                    var key = await keySelector(x).ConfigureAwait(false);
+                    var value = await valueSelector(x).ConfigureAwait(false);
+
+                    d.Add(key, value);
+
+                    return d;
+                },
+                d => Task.FromResult<ILookup<TKey, TValue>>(d)
+            );
+        }
+    }
+}