|
@@ -118,7 +118,7 @@ public class RxFsEvents : IObservable<FileSystemEventArgs>
|
|
|
{
|
|
|
// The FileSystemWatcher might report multiple errors, but
|
|
|
// we're only allowed to report one to IObservable<T>.
|
|
|
- if (onErrorAlreadyCalled)
|
|
|
+ if (!onErrorAlreadyCalled)
|
|
|
{
|
|
|
observer.OnError(e.GetException());
|
|
|
onErrorAlreadyCalled = true;
|
|
@@ -149,7 +149,7 @@ IObservable<FileSystemEventArgs> deletions =
|
|
|
fs.Where(e => e.ChangeType == WatcherChangeTypes.Deleted);
|
|
|
```
|
|
|
|
|
|
-When you call `Subscribe` on the `IObservable<T>` returned by the `Where` operator, it will call `Subscribe` on its input. So in this case, if we call `Subscribe` on both `configChanges` and `deletions`, that will result in _two_ calls to `Subscribe` on `rs`. So if `rs` is an instance of our `RxFsEvents` type above, each one will construct its own `FileSystemEventWatcher`, which is inefficient.
|
|
|
+When you call `Subscribe` on the `IObservable<T>` returned by the `Where` operator, it will call `Subscribe` on its input. So in this case, if we call `Subscribe` on both `configChanges` and `deletions`, that will result in _two_ calls to `Subscribe` on `fs`. So if `fs` is an instance of our `RxFsEvents` type above, each one will construct its own `FileSystemEventWatcher`, which is inefficient.
|
|
|
|
|
|
Rx offers a few ways to deal with this. It provides operators designed specifically to take an `IObservable<T>` that does not tolerate multiple subscribers and wrap it in an adapter that can:
|
|
|
|
|
@@ -402,7 +402,7 @@ You provide this with a delegate that will be executed each time a subscription
|
|
|
private IObservable<int> SomeNumbers()
|
|
|
{
|
|
|
return Observable.Create<int>(
|
|
|
- (IObserver<string> observer) =>
|
|
|
+ (IObserver<int> observer) =>
|
|
|
{
|
|
|
observer.OnNext(1);
|
|
|
observer.OnNext(2);
|
|
@@ -416,7 +416,7 @@ private IObservable<int> SomeNumbers()
|
|
|
|
|
|
Your delegate must return either an `IDisposable` or an `Action` to enable unsubscription. When the subscriber disposes their subscription in order to unsubscribe, Rx will invoke `Dispose()` on the `IDisposable` you returned, or in the case where you returned an `Action`, it will invoke that.
|
|
|
|
|
|
-This example is reminiscent of the `MySequenceOfNumbers` example from the start of this chapter, in that it immediately produces a few fixed values. The main difference in this case is that Rx adds some wrappers that can handle awkward situations such as re-entrancy. Rx will sometimes automatically defer work to prevent deadlocks, so it's possible that code consuming the `IObservable<string>` returned by this method will see a call to `Subscribe` return before the callback in the code above runs, in which case it would be possible for them to unsubscribe inside their `OnNext` handler.
|
|
|
+This example is reminiscent of the `MySequenceOfNumbers` example from the start of this chapter, in that it immediately produces a few fixed values. The main difference in this case is that Rx adds some wrappers that can handle awkward situations such as re-entrancy. Rx will sometimes automatically defer work to prevent deadlocks, so it's possible that code consuming the `IObservable<int>` returned by this method will see a call to `Subscribe` return before the callback in the code above runs, in which case it would be possible for them to unsubscribe inside their `OnNext` handler.
|
|
|
|
|
|
The following sequence diagram shows how this could occur in practice. Suppose the `IObservable<int>` returned by `SomeNumbers` has been wrapped by Rx in a way that ensures that subscription occurs in some different execution context. We'd typically determine the context by using a suitable [scheduler](11_SchedulingAndThreading.md#schedulers). (The [`SubscribeOn`](11_SchedulingAndThreading.md#subscribeon-and-observeon) operator creates such a wrapper.) We might use the [`TaskPoolScheduler`](11_SchedulingAndThreading.md#taskpoolscheduler) in order to ensure that the subscription occurs on some task pool thread. So when our application code calls `Subscribe`, the wrapper `IObservable<int>` doesn't immediately subscribe to the underlying observable. Instead it queues up a work item with the scheduler to do that, and then immediately returns without waiting for that work to run. This is how our subscriber can be in possession of an `IDisposable` representing the subscription before `Observable.Create` invokes our callback. The diagram shows the subscriber then making this available to the observer.
|
|
|
|
|
@@ -1297,4 +1297,4 @@ As a quick recap:
|
|
|
- IEnumerable<T>.ToObservable
|
|
|
- Observable.FromAsyncPattern
|
|
|
|
|
|
-Creating an observable sequence is our first step to practical application of Rx: create the sequence and then expose it for consumption. Now that we have a firm grasp on how to create an observable sequence, we can look in more detail at the operators that allow us to describe processing to be applied, to build up more complex observable sequences.
|
|
|
+Creating an observable sequence is our first step to practical application of Rx: create the sequence and then expose it for consumption. Now that we have a firm grasp on how to create an observable sequence, we can look in more detail at the operators that allow us to describe processing to be applied, to build up more complex observable sequences.
|