FromEvent.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #nullable disable
  5. using System.Diagnostics;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Subjects;
  9. //
  10. // BREAKING CHANGE v2 > v1.x - FromEvent[Pattern] now has an implicit SubscribeOn and Publish operation.
  11. //
  12. // The free-threaded nature of Rx is key to the performance characteristics of the event processing
  13. // pipeline. However, in places where we bridge with the external world, this sometimes has negative
  14. // effects due to thread-affine operations involved. The FromEvent[Pattern] bridges are one such
  15. // place where we reach out to add and remove operations on events.
  16. //
  17. // Consider the following piece of code, assuming Rx v1.x usage:
  18. //
  19. // var txt = Observable.FromEventPattern(txtInput, "TextChanged");
  20. // var res = from term in txt
  21. // from word in svc.Lookup(term).TakeUntil(txt)
  22. // select word;
  23. //
  24. // This code is flawed for various reasons. Seasoned Rx developers will immediately suggest usage of
  25. // the Publish operator to share the side-effects of subscribing to the txt sequence, resulting in
  26. // only one subscription to the event:
  27. //
  28. // var txt = Observable.FromEventPattern(txtInput, "TextChanged");
  29. // var res = txt.Publish(txt_ => from term in txt_
  30. // from word in svc.Lookup(term).TakeUntil(txt_)
  31. // select word);
  32. //
  33. // Customers are typically confused as to why FromEvent[Pattern] causes multiple handlers to be added
  34. // to the underlying event. This is in contrast with other From* bridges which involve the use of a
  35. // subject (e.g. FromAsyncPattern, FromAsync, and ToObservable on Task<T>).
  36. //
  37. // But there are more issues with the code fragment above. Upon completion of the svc.Lookup(term)
  38. // sequence, TakeUntil will unsubscribe from both sequences, causing the unsubscription to happen in
  39. // the context of the source's OnCompleted, which may be the thread pool. Some thread-affine events
  40. // don't quite like this. In UI frameworks like WPF and Silverlight, this turns out to be not much of
  41. // a problem typically, but it's merely an accident things work out. From an e-mail conversion with
  42. // the WPF/SL/Jupiter experts:
  43. //
  44. // "Unfortunately, as I expected, it’s confusing, and implementation details are showing through.
  45. // The bottom line is that event add/remove should always be done on the right thread.
  46. //
  47. // Where events are implemented with compiler-generated code, i.e. MultiCastDelegate, the add/remove
  48. // will be thread safe/agile. Where events are implemented in custom code, across Wpf/SL/WP/Jupiter,
  49. // the add/remove are expected to happen on the Dispatcher thread.
  50. //
  51. // Jupiter actually has the consistent story here, where all the event add/remove implementations do
  52. // the thread check. It should still be a “wrong thread” error, though, not an AV.
  53. //
  54. // In SL there’s a mix of core events (which do the thread check) and framework events (which use
  55. // compiler-generated event implementations). So you get an exception if you unhook Button.Loaded
  56. // from off thread, but you don’t get an exception if you unhook Button.Click.
  57. //
  58. // In WPF there’s a similar mix (some events are compiler-generated and some use the EventHandlerStore).
  59. // But I don’t see any thread safety or thread check in the EventHandlerStore. So while it works, IIUC,
  60. // it should have race conditions and corruptions."
  61. //
  62. // Starting with "Jupiter" (Windows XAML aka "Metro"), checks are added to ensure the add and remove
  63. // operations for UI events are called from the UI thread. As a result, the dictionary suggest sample
  64. // code shown above starts to fail. A possible fix is to use SubscribeOnDispatcher:
  65. //
  66. // var txt = Observable.FromEventPattern(txtInput, "TextChanged").SubscribeOnDispatcher();
  67. // var res = from term in txt
  68. // from word in svc.Lookup(term).TakeUntil(txt)
  69. // select word;
  70. //
  71. // This fix has two problems:
  72. //
  73. // 1. Customers often don't quite understand the difference between ObserveOn and SubscribeOn. In fact,
  74. // we've given guidance that use of the latter is typically indicative of a misunderstanding, and
  75. // is used rarely. Also, the fragment above would likely be extended with some UI binding code where
  76. // one needs to use ObserveOnDispatcher, so the combination of both becomes even more confusing.
  77. //
  78. // 2. There's a subtle race condition now. Upon receiving a new term from the txt sequence, SelectMany's
  79. // invocation of the result selector involves TakeUntil subscribing to txt again. However, the use
  80. // of SubscribeOnDispatcher means the subscription is now happening asynchronously, leaving a time
  81. // gap between returning from Subscribe and doing the += on the underlying event:
  82. //
  83. // (Subscription of TakeUntil to txt)
  84. // |
  85. // v
  86. // txt --------------------------------------------------------------
  87. // |
  88. // +-----...----+ (SubscribeOnDispatcher's post of Subscribe)
  89. // |
  90. // TextChanged ------"re"---------"rea"-------------"reac"-----"react"----...
  91. // ^
  92. // |
  93. // (where += on the event happens)
  94. //
  95. // While this problem is rare and sometimes gets mitigated by accident because code is posting back
  96. // to e.g. the UI message loop, it's extremely hard to debug when things go wrong.
  97. //
  98. // In order to fix this behavior such that code has the expected behavior, we do two things in Rx v2.0:
  99. //
  100. // - To solve the cross-thread add/remove handler operations and make them single-thread affine, we
  101. // now do an implicit SubscribeOn with the SynchronizationContext.Current retrieved eagerly upon
  102. // calling FromEvent[Pattern]. This goes hand-in-hand with a recommendation:
  103. //
  104. // "Always call FromEvent[Pattern] in a place where you'd normally write += and -= operations
  105. // yourself. Don't inline the creation of a FromEvent[Pattern] object inside a query."
  106. //
  107. // This recommendation helps to keep code clean (bridging operations are moved outside queries) and
  108. // ensures the captured SynchronizationContext is the least surprising one. E.g in the sample code
  109. // above, the whole query likely lives in a button_Click handler or so.
  110. //
  111. // - To solve the time gap issue, we now add implicit Publish behavior with ref-counted behavior. In
  112. // other words, the new FromEvent[Pattern] is pretty much the same as:
  113. //
  114. // Observable_v2.FromEvent[Pattern](<args>)
  115. // ==
  116. // Observable_v1.FromEvent[Pattern](<args>).SubscribeOn(SynchronizationContext.Current)
  117. // .Publish()
  118. // .RefCount()
  119. //
  120. // Overloads to FromEvent[Pattern] allow to specify the scheduler used for the SubscribeOn operation
  121. // that's taking place internally. When omitted, a SynchronizationContextScheduler will be supplied
  122. // if a current SynchronizationContext is found. If no current SynchronizationContext is found, the
  123. // default scheduler is the immediate scheduler, falling back to the free-threaded behavior we had
  124. // before in v1.x. (See GetSchedulerForCurrentContext in QueryLanguage.Events.cs).
  125. //
  126. // Notice a time gap can still occur at the point of the first subscription to the event sequence,
  127. // or when the ref count fell back to zero. In cases of nested uses of the sequence (such as in the
  128. // running example here), this is fine because the top-level subscription is kept alive for the whole
  129. // duration. In other cases, there's already a race condition between the underlying event and the
  130. // observable wrapper (assuming events are hot). For cold events that have side-effects upon add and
  131. // remove handler operations, use of Observable.Create is recommended. This should be rather rare,
  132. // as most events follow the typical MulticastDelegate implementation pattern:
  133. //
  134. // public event EventHandler<BarEventArgs> Bar;
  135. //
  136. // protected void OnBar(int value)
  137. // {
  138. // var bar = Bar;
  139. // if (bar != null)
  140. // bar(this, new BarEventArgs(value));
  141. // }
  142. //
  143. // In here, there's already a race between the user hooking up an event handler through the += add
  144. // operation and the event producer (possibly on a different thread) calling OnBar. It's also worth
  145. // pointing out that this race condition is mitigated by a check in SynchronizationContextScheduler
  146. // causing synchronous execution in case the caller is already on the target SynchronizationContext.
  147. // This situation is common when using FromEvent[Pattern] immediately after declaring it, e.g. in
  148. // the context of a UI event handler.
  149. //
  150. // Finally, notice we can't simply connect the event to a Subject<T> upon a FromEvent[Pattern] call,
  151. // because this would make it impossible to get rid of this one event handler (unless we expose some
  152. // other means of resource maintenance, e.g. by making the returned object implement IDisposable).
  153. // Also, this would cause the event producer to see the event's delegate in a non-null state all the
  154. // time, causing event argument objects to be newed up, possibly sending those into a zero-observer
  155. // subject (which is opaque to the event producer). Not to mention that the subject would always be
  156. // rooted by the target event (even when the FromEvent[Pattern] observable wrapper is unreachable).
  157. //
  158. namespace System.Reactive.Linq.ObservableImpl
  159. {
  160. internal sealed class FromEvent<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, TEventArgs>
  161. {
  162. private readonly Func<Action<TEventArgs>, TDelegate> _conversion;
  163. public FromEvent(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
  164. : base(addHandler, removeHandler, scheduler)
  165. {
  166. }
  167. public FromEvent(Func<Action<TEventArgs>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
  168. : base(addHandler, removeHandler, scheduler)
  169. {
  170. _conversion = conversion;
  171. }
  172. protected override TDelegate GetHandler(Action<TEventArgs> onNext)
  173. {
  174. TDelegate handler;
  175. if (_conversion == null)
  176. {
  177. handler = ReflectionUtils.CreateDelegate<TDelegate>(onNext, typeof(Action<TEventArgs>).GetMethod(nameof(Action<TEventArgs>.Invoke)));
  178. }
  179. else
  180. {
  181. handler = _conversion(onNext);
  182. }
  183. return handler;
  184. }
  185. }
  186. internal abstract class EventProducer<TDelegate, TArgs> : BasicProducer<TArgs>
  187. {
  188. private readonly IScheduler _scheduler;
  189. private readonly object _gate;
  190. protected EventProducer(IScheduler scheduler)
  191. {
  192. _scheduler = scheduler;
  193. _gate = new object();
  194. }
  195. protected abstract TDelegate GetHandler(Action<TArgs> onNext);
  196. protected abstract IDisposable AddHandler(TDelegate handler);
  197. private Session _session;
  198. protected override IDisposable Run(IObserver<TArgs> observer)
  199. {
  200. IDisposable connection;
  201. lock (_gate)
  202. {
  203. //
  204. // A session object holds on to a single handler to the underlying event, feeding
  205. // into a subject. It also ref counts the number of connections to the subject.
  206. //
  207. // When the ref count goes back to zero, the event handler is unregistered, and
  208. // the session will reach out to reset the _session field to null under the _gate
  209. // lock. Future subscriptions will cause a new session to be created.
  210. //
  211. if (_session == null)
  212. {
  213. _session = new Session(this);
  214. }
  215. connection = _session.Connect(observer);
  216. }
  217. return connection;
  218. }
  219. private sealed class Session
  220. {
  221. private readonly EventProducer<TDelegate, TArgs> _parent;
  222. private readonly Subject<TArgs> _subject;
  223. private SingleAssignmentDisposable _removeHandler;
  224. private int _count;
  225. public Session(EventProducer<TDelegate, TArgs> parent)
  226. {
  227. _parent = parent;
  228. _subject = new Subject<TArgs>();
  229. }
  230. public IDisposable Connect(IObserver<TArgs> observer)
  231. {
  232. /*
  233. * CALLERS - Ensure this is called under the lock!
  234. *
  235. lock (_parent._gate) */
  236. {
  237. //
  238. // We connect the given observer to the subject first, before performing any kind
  239. // of initialization which will register an event handler. This is done to ensure
  240. // we don't have a time gap between adding the handler and connecting the user's
  241. // subject, e.g. when the ImmediateScheduler is used.
  242. //
  243. // [OK] Use of unsafe Subscribe: called on a known subject implementation.
  244. //
  245. var connection = _subject.Subscribe/*Unsafe*/(observer);
  246. if (++_count == 1)
  247. {
  248. try
  249. {
  250. Initialize();
  251. }
  252. catch (Exception exception)
  253. {
  254. --_count;
  255. connection.Dispose();
  256. observer.OnError(exception);
  257. return Disposable.Empty;
  258. }
  259. }
  260. return Disposable.Create(
  261. (this, _parent, connection),
  262. tuple =>
  263. {
  264. var (@this, closureParent, closureConnection) = tuple;
  265. closureConnection.Dispose();
  266. lock (closureParent._gate)
  267. {
  268. if (--@this._count == 0)
  269. {
  270. closureParent._scheduler.ScheduleAction(@this._removeHandler, static handler => handler.Dispose());
  271. closureParent._session = null;
  272. }
  273. }
  274. });
  275. }
  276. }
  277. private void Initialize()
  278. {
  279. /*
  280. * CALLERS - Ensure this is called under the lock!
  281. *
  282. lock (_parent._gate) */
  283. {
  284. //
  285. // When the ref count goes to zero, no-one should be able to perform operations on
  286. // the session object anymore, because it gets nulled out.
  287. //
  288. Debug.Assert(_removeHandler == null);
  289. _removeHandler = new SingleAssignmentDisposable();
  290. //
  291. // Conversion code is supposed to be a pure function and shouldn't be run on the
  292. // scheduler, but the add handler call should. Notice the scheduler can be the
  293. // ImmediateScheduler, causing synchronous invocation. This is the default when
  294. // no SynchronizationContext is found (see QueryLanguage.Events.cs and search for
  295. // the GetSchedulerForCurrentContext method).
  296. //
  297. var onNext = _parent.GetHandler(_subject.OnNext);
  298. _parent._scheduler.ScheduleAction(onNext, AddHandler);
  299. }
  300. }
  301. private void AddHandler(TDelegate onNext)
  302. {
  303. IDisposable removeHandler;
  304. try
  305. {
  306. removeHandler = _parent.AddHandler(onNext);
  307. }
  308. catch (Exception exception)
  309. {
  310. _subject.OnError(exception);
  311. return;
  312. }
  313. //
  314. // We don't propagate the exception to the OnError channel upon Dispose. This is
  315. // not possible at this stage, because we've already auto-detached in the base
  316. // class Producer implementation. Even if we would switch the OnError and auto-
  317. // detach calls, it wouldn't work because the remove handler logic is scheduled
  318. // on the given scheduler, causing asynchrony. We can't block waiting for the
  319. // remove handler to run on the scheduler.
  320. //
  321. _removeHandler.Disposable = removeHandler;
  322. }
  323. }
  324. }
  325. internal abstract class ClassicEventProducer<TDelegate, TArgs> : EventProducer<TDelegate, TArgs>
  326. {
  327. private readonly Action<TDelegate> _addHandler;
  328. private readonly Action<TDelegate> _removeHandler;
  329. protected ClassicEventProducer(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
  330. : base(scheduler)
  331. {
  332. _addHandler = addHandler;
  333. _removeHandler = removeHandler;
  334. }
  335. protected override IDisposable AddHandler(TDelegate handler)
  336. {
  337. _addHandler(handler);
  338. return Disposable.Create(
  339. (_removeHandler, handler),
  340. tuple => tuple._removeHandler(tuple.handler));
  341. }
  342. }
  343. }