1
0
Bart De Smet 8 жил өмнө
parent
commit
001339b0c5

+ 97 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Internal/EventSource.cs

@@ -0,0 +1,97 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace System.Reactive
+{
+    internal sealed class EventSource<T> : IEventSource<T>
+    {
+        private readonly IAsyncObservable<T> _source;
+        private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
+        private readonly Func<Action<T>, /*object,*/ T, Task> _invokeHandler;
+
+        public EventSource(IAsyncObservable<T> source, Func<Action<T>, /*object,*/ T, Task> invokeHandler)
+        {
+            _source = source;
+            _invokeHandler = invokeHandler;
+            _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();
+        }
+
+        public event Action<T> OnNext
+        {
+            add
+            {
+                var gate = new object();
+                var isAdded = false;
+                var isDone = false;
+
+                var remove = new Action(() =>
+                {
+                    lock (gate)
+                    {
+                        if (isAdded)
+                            Remove(value);
+                        else
+                            isDone = true;
+                    }
+                });
+
+                //
+                // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
+                //
+                var d = _source.SubscribeAsync(
+                    x => _invokeHandler(value, /*this,*/ x),
+                    ex => { remove(); return Task.FromException(ex); },
+                    () => { remove(); return Task.CompletedTask; }
+                );
+
+                lock (gate)
+                {
+                    if (!isDone)
+                    {
+                        Add(value, d);
+                        isAdded = true;
+                    }
+                }
+            }
+
+            remove
+            {
+                Remove(value);
+            }
+        }
+
+        private void Add(Delegate handler, IDisposable disposable)
+        {
+            lock (_subscriptions)
+            {
+                var l = new Stack<IDisposable>();
+                if (!_subscriptions.TryGetValue(handler, out l))
+                    _subscriptions[handler] = l = new Stack<IDisposable>();
+
+                l.Push(disposable);
+            }
+        }
+
+        private void Remove(Delegate handler)
+        {
+            var d = default(IDisposable);
+
+            lock (_subscriptions)
+            {
+                var l = new Stack<IDisposable>();
+                if (_subscriptions.TryGetValue(handler, out l))
+                {
+                    d = l.Pop();
+                    if (l.Count == 0)
+                        _subscriptions.Remove(handler);
+                }
+            }
+
+            d?.Dispose();
+        }
+    }
+}

+ 35 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToEvent.cs

@@ -0,0 +1,35 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+        public static IEventSource<Unit> ToEvent<TSource>(this IAsyncObservable<Unit> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return new EventSource<Unit>(source, (onNext, _) =>
+            {
+                onNext(Unit.Default);
+                return Task.CompletedTask;
+            });
+        }
+
+        public static IEventSource<TSource> ToEvent<TSource>(this IAsyncObservable<TSource> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return new EventSource<TSource>(source, (onNext, x) =>
+            {
+                onNext(x);
+                return Task.CompletedTask;
+            });
+        }
+    }
+}