瀏覽代碼

Adding ToEventPattern.

Bart De Smet 8 年之前
父節點
當前提交
0c763c7ad9

+ 27 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Internal/EventPatternSource.cs

@@ -0,0 +1,27 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive
+{
+    internal sealed class EventPatternSource<TEventArgs> : EventPatternSourceBase<object, TEventArgs>, IEventPatternSource<TEventArgs>
+    {
+        public EventPatternSource(IAsyncObservable<EventPattern<object, TEventArgs>> source, Action<Action<object, TEventArgs>, /*object,*/ EventPattern<object, TEventArgs>> invokeHandler)
+            : base(source, invokeHandler)
+        {
+        }
+
+        event EventHandler<TEventArgs> IEventPatternSource<TEventArgs>.OnNext
+        {
+            add
+            {
+                Add(value, (o, e) => value(o, e));
+            }
+
+            remove
+            {
+                Remove(value);
+            }
+        }
+    }
+}

+ 126 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Internal/EventPatternSourceBase.cs

@@ -0,0 +1,126 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace System.Reactive
+{
+    /// <summary>
+    /// Base class for classes that expose an observable sequence as a well-known event pattern (sender, event arguments).
+    /// Contains functionality to maintain a map of event handler delegates to observable sequence subscriptions. Subclasses
+    /// should only add an event with custom add and remove methods calling into the base class's operations.
+    /// </summary>
+    /// <typeparam name="TSender">The type of the sender that raises the event.</typeparam>
+    /// <typeparam name="TEventArgs">The type of the event data generated by the event.</typeparam>
+    internal abstract class EventPatternSourceBase<TSender, TEventArgs>
+    {
+        private readonly IAsyncObservable<EventPattern<TSender, TEventArgs>> _source;
+        private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
+        private readonly Action<Action<TSender, TEventArgs>, /*object,*/ EventPattern<TSender, TEventArgs>> _invokeHandler;
+
+        /// <summary>
+        /// Creates a new event pattern source.
+        /// </summary>
+        /// <param name="source">Source sequence to expose as an event.</param>
+        /// <param name="invokeHandler">Delegate used to invoke the event for each element of the sequence.</param>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="invokeHandler"/> is null.</exception>
+        protected EventPatternSourceBase(IAsyncObservable<EventPattern<TSender, TEventArgs>> source, Action<Action<TSender, TEventArgs>, /*object,*/ EventPattern<TSender, TEventArgs>> invokeHandler)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (invokeHandler == null)
+                throw new ArgumentNullException(nameof(invokeHandler));
+
+            _source = source;
+            _invokeHandler = invokeHandler;
+            _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();
+        }
+
+        /// <summary>
+        /// Adds the specified event handler, causing a subscription to the underlying source.
+        /// </summary>
+        /// <param name="handler">Event handler to add. The same delegate should be passed to the Remove operation in order to remove the event handler.</param>
+        /// <param name="invoke">Invocation delegate to raise the event in the derived class.</param>
+        /// <exception cref="ArgumentNullException"><paramref name="handler"/> or <paramref name="invoke"/> is null.</exception>
+        protected void Add(Delegate handler, Action<TSender, TEventArgs> invoke)
+        {
+            if (handler == null)
+                throw new ArgumentNullException(nameof(handler));
+            if (invoke == null)
+                throw new ArgumentNullException(nameof(invoke));
+
+            var gate = new object();
+            var isAdded = false;
+            var isDone = false;
+
+            var remove = new Action(() =>
+            {
+                lock (gate)
+                {
+                    if (isAdded)
+                        Remove(handler);
+                    else
+                        isDone = true;
+                }
+            });
+
+            //
+            // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
+            //
+            var d = _source.SubscribeAsync(
+                x => { _invokeHandler(invoke, /*this,*/ x); return Task.CompletedTask; },
+                ex => { remove(); return Task.FromException(ex); },
+                () => { remove(); return Task.CompletedTask; }
+            );
+
+            lock (gate)
+            {
+                if (!isDone)
+                {
+                    Add(handler, d);
+                    isAdded = true;
+                }
+            }
+        }
+
+        private void Add(Delegate handler, IDisposable disposable)
+        {
+            lock (_subscriptions)
+            {
+                var l = new Stack<IDisposable>();
+                if (!_subscriptions.TryGetValue(handler, out l))
+                    _subscriptions[handler] = l = new Stack<IDisposable>();
+
+                l.Push(disposable);
+            }
+        }
+
+        /// <summary>
+        /// Removes the specified event handler, causing a disposal of the corresponding subscription to the underlying source that was created during the Add operation.
+        /// </summary>
+        /// <param name="handler">Event handler to remove. This should be the same delegate as one that was passed to the Add operation.</param>
+        /// <exception cref="ArgumentNullException"><paramref name="handler"/> is null.</exception>
+        protected void Remove(Delegate handler)
+        {
+            if (handler == null)
+                throw new ArgumentNullException(nameof(handler));
+
+            var d = default(IDisposable);
+
+            lock (_subscriptions)
+            {
+                var l = new Stack<IDisposable>();
+                if (_subscriptions.TryGetValue(handler, out l))
+                {
+                    d = l.Pop();
+                    if (l.Count == 0)
+                        _subscriptions.Remove(handler);
+                }
+            }
+
+            d?.Dispose();
+        }
+    }
+}

+ 3 - 3
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Internal/EventSource.cs

@@ -11,9 +11,9 @@ namespace System.Reactive
     {
         private readonly IAsyncObservable<T> _source;
         private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
-        private readonly Func<Action<T>, /*object,*/ T, Task> _invokeHandler;
+        private readonly Action<Action<T>, /*object,*/ T> _invokeHandler;
 
-        public EventSource(IAsyncObservable<T> source, Func<Action<T>, /*object,*/ T, Task> invokeHandler)
+        public EventSource(IAsyncObservable<T> source, Action<Action<T>, /*object,*/ T> invokeHandler)
         {
             _source = source;
             _invokeHandler = invokeHandler;
@@ -43,7 +43,7 @@ namespace System.Reactive
                 // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
                 //
                 var d = _source.SubscribeAsync(
-                    x => _invokeHandler(value, /*this,*/ x),
+                    x => { _invokeHandler(value, /*this,*/ x); return Task.CompletedTask; },
                     ex => { remove(); return Task.FromException(ex); },
                     () => { remove(); return Task.CompletedTask; }
                 );

+ 2 - 10
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToEvent.cs

@@ -13,11 +13,7 @@ namespace System.Reactive.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return new EventSource<Unit>(source, (onNext, _) =>
-            {
-                onNext(Unit.Default);
-                return Task.CompletedTask;
-            });
+            return new EventSource<Unit>(source, (onNext, _) => onNext(Unit.Default));
         }
 
         public static IEventSource<TSource> ToEvent<TSource>(this IAsyncObservable<TSource> source)
@@ -25,11 +21,7 @@ namespace System.Reactive.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return new EventSource<TSource>(source, (onNext, x) =>
-            {
-                onNext(x);
-                return Task.CompletedTask;
-            });
+            return new EventSource<TSource>(source, (onNext, x) => onNext(x));
         }
     }
 }

+ 17 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToEventPattern.cs

@@ -0,0 +1,17 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+        public static IEventPatternSource<TEventArgs> ToEventPattern<TEventArgs>(this IAsyncObservable<EventPattern<TEventArgs>> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return new EventPatternSource<TEventArgs>(source, (onNext, e) => onNext(e.Sender, e.EventArgs));
+        }
+    }
+}