// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { public partial class AsyncObservable { public static IAsyncObservable FromEvent(Action> addHandler, Action> removeHandler) { if (addHandler == null) throw new ArgumentNullException(nameof(addHandler)); if (removeHandler == null) throw new ArgumentNullException(nameof(removeHandler)); return FromEventCore, TEventArgs>(h => h, addHandler, removeHandler, GetSchedulerForCurrentContext()); } public static IAsyncObservable FromEvent(Action> addHandler, Action> removeHandler, IAsyncScheduler scheduler) { if (addHandler == null) throw new ArgumentNullException(nameof(addHandler)); if (removeHandler == null) throw new ArgumentNullException(nameof(removeHandler)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return FromEventCore, TEventArgs>(h => h, addHandler, removeHandler, scheduler); } public static IAsyncObservable FromEvent(Action addHandler, Action removeHandler) { if (addHandler == null) throw new ArgumentNullException(nameof(addHandler)); if (removeHandler == null) throw new ArgumentNullException(nameof(removeHandler)); return FromEventCore(h => ConvertDelegate, TDelegate>(h), addHandler, removeHandler, GetSchedulerForCurrentContext()); } public static IAsyncObservable FromEvent(Action addHandler, Action removeHandler, IAsyncScheduler scheduler) { if (addHandler == null) throw new ArgumentNullException(nameof(addHandler)); if (removeHandler == null) throw new ArgumentNullException(nameof(removeHandler)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return FromEventCore(h => ConvertDelegate, TDelegate>(h), addHandler, removeHandler, scheduler); } public static IAsyncObservable FromEvent(Action addHandler, Action removeHandler) { if (addHandler == null) throw new ArgumentNullException(nameof(addHandler)); if (removeHandler == null) throw new ArgumentNullException(nameof(removeHandler)); return FromEventCore(h => () => h(Unit.Default), addHandler, removeHandler, GetSchedulerForCurrentContext()); } public static IAsyncObservable FromEvent(Action addHandler, Action removeHandler, IAsyncScheduler scheduler) { if (addHandler == null) throw new ArgumentNullException(nameof(addHandler)); if (removeHandler == null) throw new ArgumentNullException(nameof(removeHandler)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return FromEventCore(h => () => h(Unit.Default), addHandler, removeHandler, scheduler); } public static IAsyncObservable FromEvent(Func, TDelegate> conversion, Action addHandler, Action removeHandler) { if (conversion == null) throw new ArgumentNullException(nameof(conversion)); if (addHandler == null) throw new ArgumentNullException(nameof(addHandler)); if (removeHandler == null) throw new ArgumentNullException(nameof(removeHandler)); return FromEventCore(conversion, addHandler, removeHandler, GetSchedulerForCurrentContext()); } public static IAsyncObservable FromEvent(Func, TDelegate> conversion, Action addHandler, Action removeHandler, IAsyncScheduler scheduler) { if (conversion == null) throw new ArgumentNullException(nameof(conversion)); if (addHandler == null) throw new ArgumentNullException(nameof(addHandler)); if (removeHandler == null) throw new ArgumentNullException(nameof(removeHandler)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return FromEventCore(conversion, addHandler, removeHandler, scheduler); } private static IAsyncObservable FromEventCore(Func, TDelegate> conversion, Action addHandler, Action removeHandler, IAsyncScheduler scheduler) { return SynchronizeEvents( Create(observer => { var handler = new Action(async e => { await observer.OnNextAsync(e).ConfigureAwait(false); }); var converted = conversion(handler); addHandler(converted); return new ValueTask(AsyncDisposable.Create(() => { removeHandler(converted); return default; })); }), scheduler ); } private static IAsyncScheduler GetSchedulerForCurrentContext() { var context = SynchronizationContext.Current; if (context != null) { return new SynchronizationContextAsyncScheduler(context); } return ImmediateAsyncScheduler.Instance; } private static IAsyncObservable SynchronizeEvents(this IAsyncObservable source, IAsyncScheduler scheduler) { return source.SubscribeOn(scheduler).Publish().RefCount(); } private static TTo ConvertDelegate(TFrom o) { var invokeMethod = typeof(TFrom).GetMethod("Invoke"); return (TTo)(object)Delegate.CreateDelegate(typeof(TTo), o, invokeMethod); } } }