소스 검색

Add ResetExceptionDispatchState operator and document exception reuse limitations (#2237)

This also updates the UWP hack to use 10.0.26100 SDK since the the Azure DevOps Windows 2025 agents don't have the 10.0.19041 SDK we had previously been relying on installed but they do have 26100.

Also fix build issue that seems to have started with .NET SDK 10 in which the Reverse extension method for Span<char> takes precedence over the LINQ operator.
Ian Griffiths 2 주 전
부모
커밋
1ffbcad513

+ 2 - 2
Rx.NET/Documentation/ReleaseHistory/Rx.v6.md

@@ -4,9 +4,9 @@
 
 This release adds:
 
-* A `DisposeWith`extension method for `IDisposable` to simplify disposal in conjunction with `CompositeDisposable` (see [#2178](https://github.com/dotnet/reactive/pull/2178) thanks to [Chris Pulman](https://github.com/ChrisPulman)
+* A `DisposeWith` extension method for `IDisposable` to simplify disposal in conjunction with `CompositeDisposable` (see [#2178](https://github.com/dotnet/reactive/pull/2178) thanks to [Chris Pulman](https://github.com/ChrisPulman)
 * A new overload of `TakeUntil` accepting a `CancellationToken` (see [#2181](https://github.com/dotnet/reactive/issues/2181) thanks to [Nils Aufschläger](https://github.com/nilsauf)
-
+* A new `ResetExceptionDispatchState` operator for use where a source that will provide the same `Exception` instance multiple times over (e.g., the `Throw` or `Repeat` operators) will be used in conjunction with a mechanism that turns `OnError` notifications into actual exceptions (e.g., the `await` support) to avoid the problem described in [#2187](https://github.com/dotnet/reactive/issues/2187) in which the exception's `StackTrace` gets longer and longer with each rethrowing of the exception
 
 ## v6.0.2
 

+ 3 - 3
Rx.NET/Documentation/adr/0003-uap-targets.md

@@ -77,7 +77,7 @@ The following versions are of interest. The comments about the status of the lat
 * `10.0.19045`: Windows 10 22H2, the last ever version of Windows 10 (support ends October 2025)
 * `10.0.22621`: the oldest Windows 11 version (22H2) still in GA support (enterprise only; support ends October 2025)
 * `10.0.22631`: the oldest Windows 11 version (23H2) with GA support for Home, Pro and Education (non-enterprise servicing ends November 2025; enterprise servicing ends November 2026)
-* `10.0.26100`: the latest version of Windows (24H2)
+* `10.0.26100`: the latest version of Windows (24H2), also the version installed on the `windows-2025` Azure DevOps hosted build images
 
 So as it happens, we don't technically need anything newer than 10.0.17763. So we could specify that as the minimum platform version. However, there's no compelling reason to do this, and since 10.0.18362 is as far back as the current tooling fully understands, and is the version Rx 6.0 has always targetted, it makes sense to continue with that.
 
@@ -161,12 +161,12 @@ The provides references to the .NET runtime library components. (So this provide
 So this enables normal .NET code to compile. However, Rx.NET also includes code that uses some UWP-specific APIs. (After all, a large part of the issue we're dealing with here exists because of features like schedulers that support UWP dispatchers.) And for that to work, the compiler needs access to `.winmd` files with the metadata for these APIs. So we have this:
 
 ```xml
-<ReferencePath Include="$(TargetPlatformSdkPath)UnionMetadata\10.0.19041.0\Windows.winmd" />
+<ReferencePath Include="$(TargetPlatformSdkPath)UnionMetadata\10.0.26100.0\Windows.winmd" />
 ```
 
 This relies on the `TargetPlatformSdkPath` build variable being set. When building locally (either in Visual Studio, or with `dotnet build` from the command line) this variable is set correctly, but for some reason it doesn't seem to be set on the build agents. So we set this as an environment variable in the `azure-pipelines.rx.yml` build pipeline definition.
 
-You might be wondering about that 19041 in there. Why is that not 18362, consistent with the TFM? This is because, as mentioned earlier, Azure DevOps Windows build agents have only certain Windows SDK versions installed. They don't have 18362. but they do have the 19041 version, and we can use that to target `10.0.18362`.
+You might be wondering about that 26100 in there. Why is that not 18362, consistent with the TFM? This is because, as mentioned earlier, Azure DevOps Windows build agents have only certain Windows SDK versions installed. They don't have 18362. but the `windows-2025` image does have the 26100 version, and we can use that to target `10.0.18362`.
 
 
 #### Prevent Over-Zealous WinRT Interop Code Generation

+ 227 - 0
Rx.NET/Documentation/adr/0004-onerror-to-throw.md

@@ -0,0 +1,227 @@
+# Rules for when OnError notifications become thrown exceptions
+
+Rx uses .NET `Exception` objects in a slightly unusual way: they are typically not thrown. Instead they are passed as the argument to an observer's `OnError` method. There are some situations in which an error reported in this way will end up causing an exception to be thrown. For example, if you `await` an `IObservable<T>` that calls `OnError`, the `await` will throw:
+
+```cs
+IObservable<int> ts = Observable.Throw<int>(new Exception("Pow!"));
+
+await ts; // Exception thrown here
+```
+
+This can cause problems. For example, as [#2187](https://github.com/dotnet/reactive/issues/2187) describes, if you `await` the observable shown in this example multiple times, the exception's `StackTrace` gets longer each time.
+
+Problems arise because the use of singleton exception objects is slightly tricky even with straightforward use of `throw`, but it becomes a good deal more subtle when you start to 'cross the streams' of normal .NET exception handling and Rx's use of `Exception` in `OnError`.
+
+Rx has never previously offered any guidance that would enable a developer to understand that the code shown above might have problems. The purpose of this ADR is to establish suitable rules.
+
+## Status
+
+Proposed
+
+
+## Authors
+
+@idg10 ([Ian Griffiths](https://endjin.com/who-we-are/our-people/ian-griffiths/)).
+
+
+
+## Context
+
+Exceptions may appear to be ordinary .NET objects, but they get special handling from the runtime. MSIL has a [`throw`](https://learn.microsoft.com/en-us/dotnet/api/system.reflection.emit.opcodes.throw?view=net-9.0) instruction for the purpose of raising exceptions, and there is C++ code inside the CLR that directly manipulates fields defined by the `Exception` class. Certain expectations around the use of exception types are baked deeply into the runtime.
+
+Rx does not generally use exceptions in the way the runtime expects. In particular it does not normally use the MSIL `throw` instruction to raise an exception. Instead, when an Rx `IObservable<T>` wants to report an error, it just passes an exception object as an argument to the `IObserver<T>.OnError` method.
+
+This causes no problems when an application remains entirely within Rx's world. But when we want to move into the more conventional .NET approach of throwing exceptions, it raises an interesting question: where should the exception appear to originate from? 
+
+Consider this example:
+
+```cs
+IObservable<string> fileLines = Observable.Create<string>(async obs =>
+{
+    using var reader = new StreamReader(@"c:\temp\test.txt");
+
+    while ((await reader.ReadLineAsync()) is string line)
+    {
+        obs.OnNext(line);
+    }
+});
+
+string firstNonEmptyLine = await fileLines
+    .FirstAsync(line => line.Length > 0);
+Console.WriteLine(firstNonEmptyLine);
+```
+
+If the attempt to open the file throws an exception, what do we expect to see? A developer familiar with how exceptions generally work with `async` in .NET might reasonably expect the exception to report two stack traces: one for the point at which the exception was originally thrown, and another for where it was rethrown from the `await`. And that's exactly what we see:
+
+```
+Unhandled exception. System.IO.FileNotFoundException: Could not find file 'c:\temp\test.txt'.
+File name: 'c:\temp\test.txt'
+   at Microsoft.Win32.SafeHandles.SafeFileHandle.CreateFile(String fullPath, FileMode mode, FileAccess access, FileShare share, FileOptions options)
+   at Microsoft.Win32.SafeHandles.SafeFileHandle.Open(String fullPath, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize, Nullable`1 unixCreateMode)
+   at System.IO.Strategies.OSFileStreamStrategy..ctor(String path, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize, Nullable`1 unixCreateMode)
+   at System.IO.Strategies.FileStreamHelpers.ChooseStrategyCore(String path, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize, Nullable`1 unixCreateMode)
+   at System.IO.StreamReader.ValidateArgsAndOpenPath(String path, Encoding encoding, Int32 bufferSize)
+   at System.IO.StreamReader..ctor(String path)
+   at Program.<>c.<<<Main>$>b__0_0>d.MoveNext() in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 5
+--- End of stack trace from previous location ---
+   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
+   at System.Reactive.ExceptionHelpers.Throw(Exception exception)
+   at System.Reactive.Subjects.AsyncSubject`1.GetResult()
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 13
+   at Program.<Main>(String[] args)
+   ```
+
+This is straightforward because the exception here is thrown in the conventional .NET manner. It happens to be caught by Rx—this overload of `Observable.Create` wraps the `Task` returned by the callback in an adapter that detects when the `Task` enters a faulted state, in which case it extracts the exception and passes it to the subscribing `IObserver<T>`. And then the awaiter that Rx provides when you `await` an observable rethrows this same exception.
+
+But what about the earlier example in which the exception originated from `Observable.Throw`? In that code, we construct an `Exception` but we never use the `throw` keyword with it, and nor do we invoke any API that might do that for us. What would you expect the call stack to show in that case? In practice we get this:
+
+```
+Unhandled exception. System.Exception: Pow!
+   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
+   at System.Reactive.ExceptionHelpers.Throw(Exception exception)
+   at System.Reactive.Subjects.AsyncSubject`1.GetResult()
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 20
+   at Program.<Main>(String[] args)
+```
+
+This time, we've got just a single stack trace, effectively showing the `await`. This looks very similar to the 2nd trace from the previous example—the difference here is that we don't have an extra trace showing the original location from which the exception was first thrown. And you could argue that this makes sense: this particular exception wasn't thrown until it emerged from the `await`.
+
+So far so good. But look what happens if we use this same observable source a few times:
+
+```cs
+IObservable<int> ts = Observable.Throw<int>(new Exception("Pow!"));
+
+for (int i = 0; i < 3; ++i)
+{
+    Console.WriteLine();
+    Console.WriteLine();
+
+	try
+	{
+		await ts; // Exception thrown here
+
+	}
+	catch (Exception x)
+	{
+        Console.WriteLine(x);
+	}
+}
+```
+
+Since we're doing the same thing three times, you might expect to see the same exception report three times. But that's not what happens:
+
+```
+System.Exception: Pow!
+   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
+   at System.Reactive.ExceptionHelpers.Throw(Exception exception)
+   at System.Reactive.Subjects.AsyncSubject`1.GetResult()
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27
+
+
+System.Exception: Pow!
+   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
+   at System.Reactive.ExceptionHelpers.Throw(Exception exception)
+   at System.Reactive.Subjects.AsyncSubject`1.GetResult()
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27
+   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
+   at System.Reactive.ExceptionHelpers.Throw(Exception exception)
+   at System.Reactive.Subjects.AsyncSubject`1.GetResult()
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27
+
+
+System.Exception: Pow!
+   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
+   at System.Reactive.ExceptionHelpers.Throw(Exception exception)
+   at System.Reactive.Subjects.AsyncSubject`1.GetResult()
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27
+   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
+   at System.Reactive.ExceptionHelpers.Throw(Exception exception)
+   at System.Reactive.Subjects.AsyncSubject`1.GetResult()
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27
+   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
+   at System.Reactive.ExceptionHelpers.Throw(Exception exception)
+   at System.Reactive.Subjects.AsyncSubject`1.GetResult()
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27
+```
+
+The stack trace gets longer each time!
+
+(This does not accurately reflect the actual runtime behaviour: the call stack does not in fact get deeper. It's just that the `StackTrace` string with which an `Exception` reports this information ends up containing multiple copies of the stack trace.)
+
+This makes no sense.
+
+It occurs as a direct result of the steps Rx takes to produce the stack trace we expect in the earlier example. It uses the .NET runtime library's `ExceptionDispatchInfo.Throw` method to rethrow the exception from the `await`. That method preserves the original context in which the exception was thrown, and appends the context from which it is rethrown: this is how we end up with the multiple stack traces that .NET developers are accustomed to with normal use of `async` and `await`. (In fact, Rx is using exactly the same rethrow mechanism that enables this behaviour in conventional `async` code.) 
+
+This behaviour is not peculiar to Rx. It originates from `ExceptionDispatchInfo.Throw` and we can create a `Task`-based version of this behaviour without using Rx:
+
+```
+Exception ox = new("Kaboom!");
+
+for (int i = 0; i < 3; ++i)
+{
+    Console.WriteLine();
+    Console.WriteLine();
+
+    try
+    {
+        await Task.FromException(ox); // Exception thrown here
+
+    }
+    catch (Exception x)
+    {
+        Console.WriteLine(x);
+    }
+}
+```
+
+The stack traces are shorter, but we see the same repeating behaviour:
+
+```
+System.Exception: Kaboom!
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
+
+
+System.Exception: Kaboom!
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
+
+
+System.Exception: Kaboom!
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
+   at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
+```
+
+(To be precise, this _doesn't_ happen if you create a single `Task` and `await` that multiple times. It's the combination of `ExceptionDispatchInfo.Capture` and `ExceptionDispatchInfo.Throw` that causes this accumulation, and this `Task` captures exception information at the point when we create it with `Task.FromException`.)
+
+This appending of exception data is by design: `ExceptionDispatchInfo.Throw` is intended to append the current context to whatever was captured in the `ExceptionDispatchInfo`. The .NET runtime assumes that if you want to represent a new exceptional event that you will execute a `throw`. Rx does not do this in `await` (or other mechanisms that can rethrow an exception delivered through `OnError` such as `ToEnumerable`) precisely because it preserves whatever context was present when it received the exception. It does not perform a `throw` (or do anything else to reset the exception context) because this would prevent the full context being preserved in examples such as the `FileNotFoundException` handling shown earlier.
+
+This behaviour makes sense in the context for which it was designed—capturing the context in which an exception was initially thrown and augmenting it with additional information if it is rethrown from a different context. But unless you are aware of that, it's not at all obvious that although there's nothing inherently wrong with using `Observable.Throw<int>()`, it is not compatible with having multiple subscribers that will each rethrow the exception.
+
+
+## Decision
+
+Rx.NET will explicitly adopt this position: if a developer using Rx chooses to use a mechanism that takes exceptions delivered by an `IObservable<T>` and throws them (e.g. if you `await` an `IObservable<T>`) then it is the developer's responsibility to ensure that either:
+
+* each exception object is used only once
+
+or
+
+* the exception's dispatch state is reset prior to being supplied to the observer that will be rethrowing it (e.g., by executing a `throw`)
+
+Since Rx defines operators that won't conform to the first option (notably `Observable.Throw`, but also `ReplaySubject` and the related `Observable.Replay`) Rx 6.1 introduces a new operator, `ResetExceptionDispatchState`. This passes all notifications through, but effectively performs a `throw` on any `Exception` before forwarding it. It can be used like this:
+
+```cs
+var ts = Observable.Throw<int>(new Exception("Aaargh!")).ResetExceptionDispatchState();
+```
+
+When an observer subscribes to this, the `Throw` immediately calls `OnError`, and the `ResetExceptionDispatchState` will throw (and immediately catch) that exception before passing it on to the subscriber. (You would _not_ use this in scenarios such as the `Create` example shown earlier, because in that case each exception is freshly thrown, and has useful contextual information so we don't want to reset that. This is for use specifically in cases where the exception would not otherwise be thrown.)
+
+
+## Consequences
+
+By adopting this position, we make it clear that examples such as the one in [#2187](https://github.com/dotnet/reactive/issues/2187) are not expected to work correctly.
+
+More generally, this clarifies that observable sources that repeatedly produce the same exception object (e.g. `Observable.Throw` or `Observable.Repeat`) are incompatible with multiple calls to `await`.
+
+The addition of the `ResetExceptionDispatchState` operator provides a clear, simple way to fix code that runs into this problem.

+ 1 - 1
Rx.NET/Source/src/System.Reactive.Observable.Aliases/System.Reactive.Observable.Aliases.csproj

@@ -19,7 +19,7 @@
   </ItemGroup>
 
   <ItemGroup Condition="'$(TargetFramework)'=='uap10.0.18362'">
-    <ReferencePath Include="$(TargetPlatformSdkPath)UnionMetadata\10.0.19041.0\Windows.winmd" />
+    <ReferencePath Include="$(TargetPlatformSdkPath)UnionMetadata\10.0.26100.0\Windows.winmd" />
   </ItemGroup>
 
 </Project>

+ 1 - 0
Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

@@ -582,6 +582,7 @@ namespace System.Reactive.Linq
         IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source);
         IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count);
         IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip);
+        IObservable<TSource> ResetExceptionDispatchState<TSource>(IObservable<TSource> source);
         IObservable<TSource> Dematerialize<TSource>(IObservable<Notification<TSource>> source);
         IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source);
         IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer);

+ 23 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs

@@ -134,6 +134,29 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + ResetExceptionDispatchState +
+
+        /// <summary>
+        /// Propagates all messages, but if <paramref name="source"/> produces an error, this updates the dispatch state
+        /// of the <see cref="Exception"/> to reflect the current execution context (by executing a
+        /// <see langword="throw" /> or equivalent operation) before passing the notification on.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence.</param>
+        /// <returns>The source sequence with the exception dispatch state modifying behavior applied.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        public static IObservable<TSource> ResetExceptionDispatchState<TSource>(this IObservable<TSource> source)
+        {
+            if (source == null)
+            {
+                throw new ArgumentNullException(nameof(source));
+            }
+
+            return s_impl.ResetExceptionDispatchState(source);
+        }
+
+        #endregion
+
         #region + Dematerialize +
 
         /// <summary>

+ 51 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/ResetExceptionDispatchState.cs

@@ -0,0 +1,51 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    internal class ResetExceptionDispatchState<TSource> : Producer<TSource, ResetExceptionDispatchState<TSource>._>
+    {
+        private readonly IObservable<TSource> _source;
+
+        public ResetExceptionDispatchState(IObservable<TSource> source)
+        {
+            _source = source;
+        }
+
+        protected override _ CreateSink(IObserver<TSource> observer) => new(observer);
+
+        protected override void Run(_ sink) => sink.Run(_source);
+
+        internal sealed class _ : IdentitySink<TSource>
+        {
+            public _(IObserver<TSource> observer)
+                : base(observer)
+            {
+            }
+
+            public override void OnError(Exception error)
+            {
+                try
+                {
+                    // We use an ordinary throw and not the ExceptionDispatchInfo.Throw because the latter results in
+                    // ever-growing stack trace messages. That is what caused this issue:
+                    //  https://github.com/dotnet/reactive/issues/2187
+                    // It is a basic assumption of ExceptionDispatchInfo.Throw that each call to Throw matches
+                    // up with exactly one actual throw. Since Rx signals errors by passing Exceptions as arguments
+                    // to OnError instead of throwing them, there typically isn't a throw. Since we do use
+                    // ExceptionDispatchInfo.Throw in scenarios where we convert an OnError to the raising of an
+                    // actual exception (e.g., when code uses await on an IObservable<T>), the use of a singleton
+                    // exception object results in ever-growing StackTrace strings. The purpose of the
+                    // ResetExceptionDispatchState is to execute the throw that ExceptionDispatchInfo.Throw
+                    // expects to have happened.
+                    throw error;
+                }
+                catch
+                {
+                }
+                base.OnError(error);
+            }
+        }
+    }
+}

+ 31 - 6
Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

@@ -2,6 +2,7 @@
  * WARNING: Auto-generated file (merged on 04/19/2023)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
+
 #nullable enable
 #pragma warning disable 1591
 
@@ -1240,6 +1241,30 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Propagates all messages, but if <paramref name="source" /> produces an error, this updates the dispatch state
+        /// of the <see cref="Exception" /> to reflect the current execution context (by executing a
+        /// <see langword="throw" /> or equivalent operation) before passing the notification on.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence.</param>
+        /// <returns>The source sequence with the exception dispatch state modifying behavior applied.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        public static IQbservable<TSource> ResetExceptionDispatchState<TSource>(this IQbservable<TSource> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TSource)),
+                    source.Expression
+                )
+            );
+        }
+
         /// <summary>
         /// Uses <paramref name="selector" /> to determine which source in <paramref name="sources" /> to return, choosing an empty sequence if no match is found.
         /// </summary>
@@ -10794,10 +10819,10 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element.</param>
@@ -10826,10 +10851,10 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
@@ -10858,10 +10883,10 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element.</param>

+ 9 - 0
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs

@@ -77,6 +77,15 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + ResetExceptionDispatchState +
+
+        public virtual IObservable<TSource> ResetExceptionDispatchState<TSource>(IObservable<TSource> source)
+        {
+            return new ResetExceptionDispatchState<TSource>(source);
+        }
+
+        #endregion
+
         #region + Dematerialize +
 
         public virtual IObservable<TSource> Dematerialize<TSource>(IObservable<Notification<TSource>> source)

+ 1 - 1
Rx.NET/Source/src/System.Reactive/System.Reactive.csproj

@@ -21,7 +21,7 @@
   </PropertyGroup>
 
   <ItemGroup Condition="'$(TargetFramework)'=='uap10.0.18362'">
-    <ReferencePath Include="$(TargetPlatformSdkPath)UnionMetadata\10.0.19041.0\Windows.winmd" />
+    <ReferencePath Include="$(TargetPlatformSdkPath)UnionMetadata\10.0.26100.0\Windows.winmd" />
 
   </ItemGroup>
 

+ 3 - 1
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.verified.cs

@@ -1340,6 +1340,7 @@ namespace System.Reactive.Linq
         public static System.IObservable<TResult> Replay<TSource, TResult>(this System.IObservable<TSource> source, System.Func<System.IObservable<TSource>, System.IObservable<TResult>> selector, int bufferSize, System.TimeSpan window) { }
         public static System.IObservable<TResult> Replay<TSource, TResult>(this System.IObservable<TSource> source, System.Func<System.IObservable<TSource>, System.IObservable<TResult>> selector, System.TimeSpan window, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TResult> Replay<TSource, TResult>(this System.IObservable<TSource> source, System.Func<System.IObservable<TSource>, System.IObservable<TResult>> selector, int bufferSize, System.TimeSpan window, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<TSource> ResetExceptionDispatchState<TSource>(this System.IObservable<TSource> source) { }
         public static System.IObservable<TSource> Retry<TSource>(this System.IObservable<TSource> source) { }
         public static System.IObservable<TSource> Retry<TSource>(this System.IObservable<TSource> source, int retryCount) { }
         public static System.IObservable<TSource> RetryWhen<TSource, TSignal>(this System.IObservable<TSource> source, System.Func<System.IObservable<System.Exception>, System.IObservable<TSignal>> handler) { }
@@ -2187,6 +2188,7 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<TResult> Replay<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector, int bufferSize, System.TimeSpan window) { }
         public static System.Reactive.Linq.IQbservable<TResult> Replay<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector, System.TimeSpan window, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TResult> Replay<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector, int bufferSize, System.TimeSpan window, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<TSource> ResetExceptionDispatchState<TSource>(this System.Reactive.Linq.IQbservable<TSource> source) { }
         public static System.Reactive.Linq.IQbservable<TSource> Retry<TSource>(this System.Reactive.Linq.IQbservable<TSource> source) { }
         public static System.Reactive.Linq.IQbservable<TSource> Retry<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, int retryCount) { }
         public static System.Reactive.Linq.IQbservable<TSource> RetryWhen<TSource, TSignal>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<System.Exception>, System.IObservable<TSignal>>> handler) { }
@@ -3200,4 +3202,4 @@ namespace System.Runtime.CompilerServices
             where TStateMachine : System.Runtime.CompilerServices.IAsyncStateMachine { }
         public static System.Runtime.CompilerServices.TaskObservableMethodBuilder<T> Create() { }
     }
-}
+}

+ 12 - 2
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/MaxTest.cs

@@ -2353,7 +2353,12 @@ namespace ReactiveTests.Tests
                 OnCompleted<string>(240)
             );
 
-            var res = scheduler.Start(() => xs.Max(x => new string(x.ToCharArray().Reverse().ToArray())));
+            // Note: AsEnumerable required because on .NET 10 SDK (C# 14) but when targetting .NET 8, we hit the
+            // issue at https://github.com/dotnet/runtime/issues/107723 and don't have access to the fix
+            // implemented in https://github.com/dotnet/runtime/pull/107957
+            // When test projects no longer target any version of .NET older than 10.0, we can revert this,
+            // removing the AsEnumerable.
+            var res = scheduler.Start(() => xs.Max(x => new string(x.ToCharArray().AsEnumerable().Reverse().ToArray())));
 
             res.Messages.AssertEqual(
                 OnNext(240, "xuq"),
@@ -2377,7 +2382,12 @@ namespace ReactiveTests.Tests
                 OnCompleted<string>(240)
             );
 
-            var res = scheduler.Start(() => xs.Max(x => new string(x.ToCharArray().Reverse().ToArray()), new ReverseComparer<string>(Comparer<string>.Default)));
+            // Note: AsEnumerable required because on .NET 10 SDK (C# 14) but when targetting .NET 8, we hit the
+            // issue at https://github.com/dotnet/runtime/issues/107723 and don't have access to the fix
+            // implemented in https://github.com/dotnet/runtime/pull/107957
+            // When test projects no longer target any version of .NET older than 10.0, we can revert this,
+            // removing the AsEnumerable.
+            var res = scheduler.Start(() => xs.Max(x => new string(x.ToCharArray().AsEnumerable().Reverse().ToArray()), new ReverseComparer<string>(Comparer<string>.Default)));
 
             res.Messages.AssertEqual(
                 OnNext(240, "oof"),

+ 12 - 2
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/MinTest.cs

@@ -2354,7 +2354,12 @@ namespace ReactiveTests.Tests
                 OnCompleted<string>(240)
             );
 
-            var res = scheduler.Start(() => xs.Min(x => new string(x.ToCharArray().Reverse().ToArray())));
+            // Note: AsEnumerable required because on .NET 10 SDK (C# 14) but when targetting .NET 8, we hit the
+            // issue at https://github.com/dotnet/runtime/issues/107723 and don't have access to the fix
+            // implemented in https://github.com/dotnet/runtime/pull/107957
+            // When test projects no longer target any version of .NET older than 10.0, we can revert this,
+            // removing the AsEnumerable.
+            var res = scheduler.Start(() => xs.Min(x => new string(x.ToCharArray().AsEnumerable().Reverse().ToArray())));
 
             res.Messages.AssertEqual(
                 OnNext(240, "oof"),
@@ -2378,7 +2383,12 @@ namespace ReactiveTests.Tests
                 OnCompleted<string>(240)
             );
 
-            var res = scheduler.Start(() => xs.Min(x => new string(x.ToCharArray().Reverse().ToArray()), new ReverseComparer<string>(Comparer<string>.Default)));
+            // Note: AsEnumerable required because on .NET 10 SDK (C# 14) but when targetting .NET 8, we hit the
+            // issue at https://github.com/dotnet/runtime/issues/107723 and don't have access to the fix
+            // implemented in https://github.com/dotnet/runtime/pull/107957
+            // When test projects no longer target any version of .NET older than 10.0, we can revert this,
+            // removing the AsEnumerable.
+            var res = scheduler.Start(() => xs.Min(x => new string(x.ToCharArray().AsEnumerable().Reverse().ToArray()), new ReverseComparer<string>(Comparer<string>.Default)));
 
             res.Messages.AssertEqual(
                 OnNext(240, "xuq"),

+ 73 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ResetExceptionDispatchStateTest.cs

@@ -0,0 +1,73 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+using Assert = Xunit.Assert;
+using System.Threading.Tasks;
+
+namespace ReactiveTests.Tests
+{
+    [TestClass]
+    public class ResetExceptionDispatchStateTest
+    {
+
+        [TestMethod]
+        public async Task ResetExceptionDispatchState_Throw_Consistent_StackTrace_On_Await()
+        {
+            var ts = Observable.Throw<int>(new Exception("Aaargh!")).ResetExceptionDispatchState();
+
+            string stackTrace = null;
+            for (int i = 0; i < 3; i++)
+            {
+                try
+                {
+                    _ = await ts;
+                }
+                catch (Exception e)
+                {
+                    if (stackTrace is null)
+                    {
+                        stackTrace = e.StackTrace;
+                    }
+                    else
+                    {
+                        Assert.Equal(stackTrace, e.StackTrace);
+                    }
+                }
+            }
+        }
+
+        [TestMethod]
+        public async Task ResetExceptionDispatchState_Replay_Consistent_StackTrace_On_Await()
+        {
+            var cts = Observable.Throw<int>(new Exception("Aaargh!"), CurrentThreadScheduler.Instance).Replay(1);
+            cts.Connect();
+            var ts = cts.ResetExceptionDispatchState();
+
+            string stackTrace = null;
+            for (int i = 0; i < 3; i++)
+            {
+                try
+                {
+                    _ = await ts;
+                }
+                catch (Exception e)
+                {
+                    if (stackTrace is null)
+                    {
+                        stackTrace = e.StackTrace;
+                    }
+                    else
+                    {
+                        Assert.Equal(stackTrace, e.StackTrace);
+                    }
+                }
+            }
+        }
+    }
+}

+ 1 - 1
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ThrowTest.cs

@@ -11,6 +11,7 @@ using ReactiveTests.Dummies;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
 using Assert = Xunit.Assert;
+using System.Threading.Tasks;
 
 namespace ReactiveTests.Tests
 {
@@ -114,6 +115,5 @@ namespace ReactiveTests.Tests
 
             Assert.Same(ex, res);
         }
-
     }
 }

+ 9 - 3
Rx.NET/tools/HomoIcon/HomoIcon.csproj

@@ -1,5 +1,5 @@
 <?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
   <PropertyGroup>
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
     <Platform Condition=" '$(Platform)' == '' ">x86</Platform>
@@ -10,8 +10,9 @@
     <AppDesignerFolder>Properties</AppDesignerFolder>
     <RootNamespace>HomoIcon</RootNamespace>
     <AssemblyName>HomoIcon</AssemblyName>
-    <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
-    <TargetFrameworkProfile>Client</TargetFrameworkProfile>
+    <TargetFrameworkVersion>v4.8</TargetFrameworkVersion>
+    <TargetFrameworkProfile>
+    </TargetFrameworkProfile>
     <FileAlignment>512</FileAlignment>
     <SccProjectName>SAK</SccProjectName>
     <SccLocalPath>SAK</SccLocalPath>
@@ -28,6 +29,7 @@
     <ErrorReport>prompt</ErrorReport>
     <WarningLevel>4</WarningLevel>
     <UseVSHostingProcess>false</UseVSHostingProcess>
+    <Prefer32Bit>false</Prefer32Bit>
   </PropertyGroup>
   <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' ">
     <PlatformTarget>x86</PlatformTarget>
@@ -37,6 +39,7 @@
     <DefineConstants>TRACE</DefineConstants>
     <ErrorReport>prompt</ErrorReport>
     <WarningLevel>4</WarningLevel>
+    <Prefer32Bit>false</Prefer32Bit>
   </PropertyGroup>
   <ItemGroup>
     <Reference Include="System" />
@@ -48,6 +51,9 @@
     <Compile Include="Program.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
+  <ItemGroup>
+    <None Include="app.config" />
+  </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
        Other similar extension points exist, see Microsoft.Common.targets.

+ 3 - 0
Rx.NET/tools/HomoIcon/app.config

@@ -0,0 +1,3 @@
+<?xml version="1.0" encoding="utf-8"?>
+<configuration>
+<startup><supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.8"/></startup></configuration>

+ 1 - 3
azure-pipelines.rx.yml

@@ -25,14 +25,12 @@ stages:
   jobs:
   - job: Build
     pool:
-      vmImage: windows-latest
+      vmImage: windows-2025
 
     variables:
       BuildConfiguration: Release
       BuildPlatform: Any CPU
       DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true
-      # TODO: work out why the build doesn't set TargetPlatformSdkPath correctly on the build agent
-      TargetPlatformSdkPath: 'C:\Program Files (x86)\Windows Kits\10\'
 
     steps:
     - task: UseDotNet@2