|
@@ -6,7 +6,181 @@ Rx's compositional nature is the key to its power, but it can look like a proble
|
|
|
|
|
|
You've already seen some answer to these questions. The [Creating Observable Sequences chapter](03_CreatingObservableSequences.md) showed various ways to create observable sources. But when it comes to handling the items that emerge from an `IObservable<T>`, all we've really seen is how to implement [`IObserver<T>`](02_KeyTypes.md#iobserver), and [how to use the callback based `Subscribe` extension methods to subscribe to an `IObservable<T>`](02_KeyTypes.md#iobservable).
|
|
|
|
|
|
-In this chapter, we will look at the methods in Rx which allow you to leave the `IObservable<T>` world, so you can take action based on the notifications that emerge from an Rx source.
|
|
|
+In this chapter, we will look at the methods in Rx which allow you to leave the `IObservable<T>` world, so you can take action based on the notifications that emerge from an Rx source. But first, we need to look at the challenge this creates for error handling.
|
|
|
+
|
|
|
+## Exception state
|
|
|
+
|
|
|
+There's a rule you must conform to when exceptions leave Rx's world: if a developer using Rx chooses to use a mechanism that takes exceptions delivered by an `IObservable<T>` and throws them (e.g. if you `await` an `IObservable<T>`) then it is the developer's responsibility to ensure that either:
|
|
|
+
|
|
|
+* each exception object is used only once
|
|
|
+
|
|
|
+or
|
|
|
+
|
|
|
+* the exception's dispatch state is reset prior to being supplied to the observer that will be rethrowing it (e.g., by executing a `throw`)
|
|
|
+
|
|
|
+For example, if you use the [`Throw`](03_CreatingObservableSequences.md#observablethrow) operator, this creates a problem if you plan to `await` the resulting sequence: by design, `Throw` does nothing at all to the state of the exception you supply. This is deliberate, because you might be giving it an exception that already has important state there and it does not want to destroy that information. However, in cases where the exception has never actually been thrown, the state never gets set (and therefore never reset). There is no good way to detect whether exception state is present, so `Throw` must conservatively assume that it might be. This means that if you were to `await` such a sequence twice, you'd be breaking these rules. Rx 6.1 introduced a new operator, [`ResetExceptionDispatchState`](#resetexceptiondispatchstate), to deal with this scenario. If you write something like this:
|
|
|
+
|
|
|
+```cs
|
|
|
+IObservable<int> t = Observable
|
|
|
+ .Throw<int>(new Exception("Boom!"))
|
|
|
+ .ResetExceptionDispatchState();
|
|
|
+```
|
|
|
+
|
|
|
+the presence of that `ResetExceptionDispatchState` ensures that when the `Throw` reports the error by calling `OnError`, the exception state is reset at that instant.
|
|
|
+
|
|
|
+But why do these rules exist?
|
|
|
+
|
|
|
+Code that lives entirely within Rx's world reports errors by having an `IObservable<T>` invoke an observer's `OnError` method, passing an `Exception` representing the error. Exceptions are reported, discovered, and handled without ever using `throw` or `catch`. This makes Rx's use of exceptions somewhat unusual, and there's a consequence: if you construct but do not throw an exception, its _exception state_ is never set.
|
|
|
+
|
|
|
+Exception state is the information describing the context in which the exception was thrown. It includes a textual description of the call stack, which can help developers diagnose the problem. It also includes more machine-friendly information, describing the crash site with technical identifiers that can help automated systems analyze errors. [Windows Error Reporting](https://learn.microsoft.com/en-us/windows/win32/wer/windows-error-reporting) can use this to distinguish between different kinds of application failures. For desktop applications, this information can (with user consent) be collected centrally, enabling developers to discover which crashes users encounter most often in practical use. Failures are identified by the particular combination of technical identifiers (which effectively identify the particular location in the code that failed, and also the type of failure that occurred). Each unique combination is known informally as a _fault bucket_. You can see this information in the Windows Event Viewer. If a .NET application throws an unhandled exception, you typically get three events in the event log:
|
|
|
+
|
|
|
+* a **.NET Runtime** entry (typically with Event ID 1026) reporting the application executable name, .NET version, and .NET stack trace text
|
|
|
+* an **Application Crashing Events** entry (typically with Event ID 1000) with generic (non-.NET-specific) details in a form that would appear for any application crash, such as the executable path, Win32 SEH exception type, the DLL in which the exception originated and the offset within that DLL
|
|
|
+* a **Windows Error Reporting** entry (typically with Event ID 1001) containing the _fault bucket_ details
|
|
|
+
|
|
|
+Here's an example of that third kind from a crashing .NET application:
|
|
|
+
|
|
|
+```
|
|
|
+Fault bucket 1162564721043373785, type 4
|
|
|
+Event Name: APPCRASH
|
|
|
+Response: Not available
|
|
|
+Cab Id: 0
|
|
|
+
|
|
|
+Problem signature:
|
|
|
+P1: TestResetExceptionDispatchState.exe
|
|
|
+P2: 1.0.0.0
|
|
|
+P3: 68a40000
|
|
|
+P4: KERNELBASE.dll
|
|
|
+P5: 10.0.26100.6584
|
|
|
+P6: 0a9b38fe
|
|
|
+P7: e0434352
|
|
|
+P8: 00000000000c66ca
|
|
|
+P9:
|
|
|
+P10:
|
|
|
+```
|
|
|
+
|
|
|
+This is harder to understand than a stack trace, but these numeric identifiers make it easier to determine automatically whether two crashes are in some sense 'the same'. The idea here is to make it possible to discover when the same kind of error is happening a lot, enabling developers to focus their efforts on fixing the types of crashes causing the most trouble.
|
|
|
+
|
|
|
+With a .NET application, we want this information to reflect the origin of the failure. But it's quite common for exceptions to be caught and rethrown. What we _don't_ want is for every single error to be categorised as the same type just because we wrote some common error handling code. .NET itself has to contend with this: when we use `await` to wait for an asynchronous operation that has failed, that failure exception will have been put into a `Task` or `ValueTask`, and .NET rethrows that into the code that calls `await`. It would be deeply unhelpful if every single failing asynchronous operation was reported to Windows Error Handling as having occurred inside the .NET runtime library code that rethrows exceptions captured by a `Task`!
|
|
|
+
|
|
|
+The purpose of _exception state_ is to hold onto the exception's origin information even if the exception gets rethrown. If you rethrow an exception from within a `catch` block by using `throw;` this exceptions state mostly gets preserved. ('Mostly', because the stack information gets augmented: the stack trace will report both the original throw location and the rethrow location.) This does _not_ work correctly if you try to rethrow the exception with `throw ex;` inside the `catch` block, which is why it's important to use the no-arguments `throw;` form when rethrowing. As for exceptions that end up being caught in one place and rethrown in a completely different place (e.g. on a different thread because `await` is being used) the .NET runtime libraries provide a helper called [`ExceptionDispatchInfo`](https://learn.microsoft.com/dotnet/api/system.runtime.exceptionservices.exceptiondispatchinfo) that can help manage this exception state, and ensure that it is used when rethrowing an exception in a completely different context from which it was thrown.
|
|
|
+
|
|
|
+Rx uses this to ensure that examples such as the following behave as expected:
|
|
|
+
|
|
|
+```cs
|
|
|
+IObservable<string> fileLines = Observable.Create<string>(async obs =>
|
|
|
+{
|
|
|
+ using var reader = new StreamReader(@"c:\temp\test.txt");
|
|
|
+
|
|
|
+ while ((await reader.ReadLineAsync()) is string line)
|
|
|
+ {
|
|
|
+ obs.OnNext(line);
|
|
|
+ }
|
|
|
+});
|
|
|
+
|
|
|
+string firstNonEmptyLine = await fileLines
|
|
|
+ .FirstAsync(line => line.Length > 0);
|
|
|
+Console.WriteLine(firstNonEmptyLine);
|
|
|
+```
|
|
|
+
|
|
|
+If the attempt to open the file throws an exception, what do we expect to see? A developer familiar with how exceptions generally work with `async`/`await` in .NET might reasonably expect the exception to report two stack traces: one for the point at which the exception was originally thrown, and another for where it was rethrown from the `await`. And that's exactly what we see:
|
|
|
+
|
|
|
+```
|
|
|
+Unhandled exception. System.IO.FileNotFoundException: Could not find file 'c:\temp\test.txt'.
|
|
|
+File name: 'c:\temp\test.txt'
|
|
|
+ at Microsoft.Win32.SafeHandles.SafeFileHandle.CreateFile(String fullPath, FileMode mode, FileAccess access, FileShare share, FileOptions options)
|
|
|
+ at Microsoft.Win32.SafeHandles.SafeFileHandle.Open(String fullPath, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize, Nullable`1 unixCreateMode)
|
|
|
+ at System.IO.Strategies.OSFileStreamStrategy..ctor(String path, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize, Nullable`1 unixCreateMode)
|
|
|
+ at System.IO.Strategies.FileStreamHelpers.ChooseStrategyCore(String path, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize, Nullable`1 unixCreateMode)
|
|
|
+ at System.IO.StreamReader.ValidateArgsAndOpenPath(String path, Encoding encoding, Int32 bufferSize)
|
|
|
+ at System.IO.StreamReader..ctor(String path)
|
|
|
+ at Program.<>c.<<<Main>$>b__0_0>d.MoveNext() in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 5
|
|
|
+--- End of stack trace from previous location ---
|
|
|
+ at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
|
|
|
+ at System.Reactive.ExceptionHelpers.Throw(Exception exception)
|
|
|
+ at System.Reactive.Subjects.AsyncSubject`1.GetResult()
|
|
|
+ at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 13
|
|
|
+ at Program.<Main>(String[] args)
|
|
|
+```
|
|
|
+
|
|
|
+This conforms to the rules described above because the exception is thrown in the conventional .NET manner here. Rx then catches it—this overload of `Observable.Create` wraps the `Task` returned by the callback in an adapter that detects when the `Task` enters a faulted state, in which case it extracts the exception and passes it to the subscribing `IObserver<T>`. And then the awaiter that Rx provides when you `await` an observable rethrows this same exception. Rx uses the `ExceptionDispatchInfo` mechanisms to ensure that the exception state captured at the point where the exception was originally thrown remains present when the exception is eventually rethrown to the code that used `await` on the observable.
|
|
|
+
|
|
|
+This is fine when the exception originates by being thrown in the conventional way, which it does in that last example. Where it can go wrong is if the exception never gets thrown at all: the rethrowing mechanism provided by `ExceptionDispatchInfo` gets a little confused in this case. This problem is not specific to Rx by the way, as the following code illustrates:
|
|
|
+
|
|
|
+```cs
|
|
|
+Exception ox = new("Kaboom!");
|
|
|
+
|
|
|
+for (int i = 0; i < 3; ++i)
|
|
|
+{
|
|
|
+ Console.WriteLine();
|
|
|
+ Console.WriteLine();
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ await Task.FromException(ox); // Exception thrown here
|
|
|
+
|
|
|
+ }
|
|
|
+ catch (Exception x)
|
|
|
+ {
|
|
|
+ Console.WriteLine(x);
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+This creates a single `Exception` object and then wraps it in a `Task` (using `Task.FromException`) which it immediately `await`s. There is no Rx code here. Running this we get the following slightly surprising behaviour:
|
|
|
+
|
|
|
+```
|
|
|
+System.Exception: Kaboom!
|
|
|
+ at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
|
|
|
+
|
|
|
+
|
|
|
+System.Exception: Kaboom!
|
|
|
+ at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
|
|
|
+ at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
|
|
|
+
|
|
|
+
|
|
|
+System.Exception: Kaboom!
|
|
|
+ at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
|
|
|
+ at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
|
|
|
+ at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10
|
|
|
+```
|
|
|
+
|
|
|
+Notice that each time around the loop, the stack trace gets longer. The problem here is that nothing ever resets the exception dispatch state. Normally that happens when the original `throw` occurs, but in this example we never use `throw`, and so that reset never happens. When you `await` a `Task`, the .NET runtime uses `ExceptionDispatchInfo` to rethrow the exception, and the point of that mechanism is that it preserves the original exceptions state, and appends the current stack so that you get a complete record of the exception's history. It wasn't designed to cope with an exception being rethrown multiple times without corresponding multiple `throw`s.
|
|
|
+
|
|
|
+Since Rx uses the same mechanism when exceptions emerge out of Rx's world, the Rx equivalent of that `Task` example has exactly the same problem:
|
|
|
+
|
|
|
+```cs
|
|
|
+IObservable<int> ts = Observable.Throw<int>(new Exception("Pow!"));
|
|
|
+
|
|
|
+for (int i = 0; i < 3; ++i)
|
|
|
+{
|
|
|
+ Console.WriteLine();
|
|
|
+ Console.WriteLine();
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ await ts; // Exception thrown here
|
|
|
+
|
|
|
+ }
|
|
|
+ catch (Exception x)
|
|
|
+ {
|
|
|
+ Console.WriteLine(x);
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+This will also produce a longer stack trace each time round the loop, because again, we're rethrowing the same exception object multiple times without ever resetting its state.
|
|
|
+
|
|
|
+Remember, Rx deliberately preserves the state because for all it knows, that state was important. (This is how the example with the `FileNotFoundException` above reports the correct information). This is why Rx 6.1 introduced the [`ResetExceptionDispatchState`](#resetexceptiondispatchstate) operator. It enables us to tell Rx that we do in fact want it to reset the exception state each time an error emerges from an observable. If we change the first statement of the example above to this:
|
|
|
+
|
|
|
+```cs
|
|
|
+IObservable<int> ts = Observable
|
|
|
+ .Throw<int>(new Exception("Pow!"))
|
|
|
+ .ResetExceptionDispatchState();
|
|
|
+```
|
|
|
+
|
|
|
+this now conforms to the rules described at the start of this section, which prevents the ever-growing stack trace problem.
|
|
|
+
|
|
|
|
|
|
## Integration with `async` and `await`
|
|
|
|
|
@@ -209,6 +383,18 @@ public static IObservable<IDictionary<TKey, TSource>> ToDictionary<TSource, TKey
|
|
|
|
|
|
The `ToLookup` extension offers near-identical-looking overloads, the difference being the return type (and the name, obviously). They all return an `IObservable<ILookup<TKey, TElement>>`. As with LINQ to Objects, the distinction between a dictionary and a lookup is that the `ILookup<TKey, TElement>>` interface allows each key to have any number of values, whereas a dictionary maps each key to one value.
|
|
|
|
|
|
+## ResetExceptionDispatchState
|
|
|
+
|
|
|
+The `ResetExceptionDispatchState` instructs Rx to reset the _exception dispatch state_ of any exception passed to `OnError` before passing the exception on to its observer. Other than that, this operator just passes all notifications through unmodified.
|
|
|
+
|
|
|
+The purpose of this operator is to make it easier to conform with the rules described in the [Exception state](#exception-state) section above. You typically only need to use if when an exception originates from an Rx observable without ever having executed a `throw`, and you then go on to cause that exception to leave Rx's world and to be rethrown in the normal .NET way.
|
|
|
+
|
|
|
+For example, [`Observable.Throw`](03_CreatingObservableSequences.md#observablethrow) does not reset the state of the exception you give it (in case there was anything important in that state), but if you might be using a world-crossing mechanism like `await` that will rethrow the exception, and if you might do so multiple times for the same exception object, you will need to use this operator to ensure that the exception state gets reset each time.
|
|
|
+
|
|
|
+This can also be useful when using the [`Replay`](15_PublishingOperators.md#replay) operator, because even if the underlying source is using `throw` (thus resetting the exception state), `Replay` will hold onto the exception, and will deliver the same object to any subsequent subscribers. The use of `Replay` effectively prevents the reset that would otherwise have happened. So if you then go on to use `await` (or any other of the mechanisms in this chapter that would cause an exception delivered to `OnError` to be rethrown) you will no longer be following the rules described in [Exception state](#exception-state), which can result in problems such as ever-longer stack traces.
|
|
|
+
|
|
|
+If you remain entirely within Rx's world, you should not need to use `ResetExceptionDispatchState`. This operator exists only to deal with a problem that can occur when crossing from Rx's world to the world of conventional exception throwing.
|
|
|
+
|
|
|
## ToTask
|
|
|
|
|
|
Although Rx provides direct support for using `await` with an `IObservable<T>`, it can sometimes be useful to obtain a `Task<T>` representing an `IObservable<T>`. This is useful because some APIs expect a `Task<T>`. You can call `ToTask()` on any `IObservable<T>`, and this will subscribe to that observable, returning a `Task<T>` that will complete when the task completes, producing the sequence's final output as the task's result. If the source completes without producing an element, the task will enter a faulted state, with an `InvalidOperation` exception complaining that the input sequence contains no elements.
|