소스 검색

Changing locking strategy in ReplaySubject.

Bart De Smet 10 년 전
부모
커밋
14b027fb90

+ 275 - 184
Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs

@@ -30,8 +30,8 @@ namespace System.Reactive.Subjects
         /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
         /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
         /// </summary>
         /// </summary>
         public ReplaySubject()
         public ReplaySubject()
+            : this(int.MaxValue)
         {
         {
-            _implementation = new ReplayAll();
         }
         }
 
 
         /// <summary>
         /// <summary>
@@ -216,7 +216,6 @@ namespace System.Reactive.Subjects
         private interface IReplaySubjectImplementation : ISubject<T>, IDisposable
         private interface IReplaySubjectImplementation : ISubject<T>, IDisposable
         {
         {
             bool HasObservers { get; }
             bool HasObservers { get; }
-            void Unsubscribe(IObserver<T> observer);
         }
         }
 
 
         private abstract class ReplayBase : IReplaySubjectImplementation
         private abstract class ReplayBase : IReplaySubjectImplementation
@@ -265,8 +264,10 @@ namespace System.Reactive.Subjects
                 }
                 }
 
 
                 if (o != null)
                 if (o != null)
+                {
                     foreach (var observer in o)
                     foreach (var observer in o)
                         observer.EnsureActive();
                         observer.EnsureActive();
+                }
             }
             }
 
 
             public void OnError(Exception error)
             public void OnError(Exception error)
@@ -294,8 +295,10 @@ namespace System.Reactive.Subjects
                 }
                 }
 
 
                 if (o != null)
                 if (o != null)
+                {
                     foreach (var observer in o)
                     foreach (var observer in o)
                         observer.EnsureActive();
                         observer.EnsureActive();
+                }
             }
             }
 
 
             public void OnCompleted()
             public void OnCompleted()
@@ -319,8 +322,10 @@ namespace System.Reactive.Subjects
                 }
                 }
 
 
                 if (o != null)
                 if (o != null)
+                {
                     foreach (var observer in o)
                     foreach (var observer in o)
                         observer.EnsureActive();
                         observer.EnsureActive();
+                }
             }
             }
 
 
             public IDisposable Subscribe(IObserver<T> observer)
             public IDisposable Subscribe(IObserver<T> observer)
@@ -332,7 +337,7 @@ namespace System.Reactive.Subjects
 
 
                 var n = 0;
                 var n = 0;
 
 
-                var subscription = new RemovableDisposable(this, so);
+                var subscription = new Subscription(this, so);
                 lock (_gate)
                 lock (_gate)
                 {
                 {
                     CheckDisposed();
                     CheckDisposed();
@@ -408,8 +413,6 @@ namespace System.Reactive.Subjects
             {
             {
                 lock (_gate)
                 lock (_gate)
                 {
                 {
-                    observer.Dispose();
-
                     if (!_isDisposed)
                     if (!_isDisposed)
                     {
                     {
                         _observers = _observers.Remove(observer);
                         _observers = _observers.Remove(observer);
@@ -417,18 +420,12 @@ namespace System.Reactive.Subjects
                 }
                 }
             }
             }
 
 
-            void IReplaySubjectImplementation.Unsubscribe(IObserver<T> observer)
-            {
-                var so = (IScheduledObserver<T>)observer;
-                Unsubscribe(so);
-            }
-
-            private sealed class RemovableDisposable : IDisposable
+            private sealed class Subscription : IDisposable
             {
             {
                 private readonly ReplayBase _subject;
                 private readonly ReplayBase _subject;
                 private readonly IScheduledObserver<T> _observer;
                 private readonly IScheduledObserver<T> _observer;
 
 
-                public RemovableDisposable(ReplayBase subject, IScheduledObserver<T> observer)
+                public Subscription(ReplayBase subject, IScheduledObserver<T> observer)
                 {
                 {
                     _subject = subject;
                     _subject = subject;
                     _observer = observer;
                     _observer = observer;
@@ -554,21 +551,27 @@ namespace System.Reactive.Subjects
                 //
                 //
             }
             }
 
 
-            protected override void AddValueToBuffer(T value)
+            protected override void Next(T value)
             {
             {
                 _hasValue = true;
                 _hasValue = true;
                 _value = value;
                 _value = value;
             }
             }
 
 
-            protected override void ReplayBuffer(IObserver<T> observer)
+            protected override int Replay(IObserver<T> observer)
             {
             {
+                var n = 0;
+
                 if (_hasValue)
                 if (_hasValue)
+                {
+                    n = 1;
                     observer.OnNext(_value);
                     observer.OnNext(_value);
+                }
+
+                return n;
             }
             }
 
 
-            protected override void Dispose(bool disposing)
+            protected override void DisposeCore()
             {
             {
-                base.Dispose(disposing);
                 _value = default(T);
                 _value = default(T);
             }
             }
         }
         }
@@ -585,8 +588,8 @@ namespace System.Reactive.Subjects
 
 
             protected override void Trim()
             protected override void Trim()
             {
             {
-                while (Queue.Count > _bufferSize)
-                    Queue.Dequeue();
+                while (_queue.Count > _bufferSize)
+                    _queue.Dequeue();
             }
             }
         }
         }
 
 
@@ -605,230 +608,318 @@ namespace System.Reactive.Subjects
             }
             }
         }
         }
 
 
-        private abstract class ReplayBufferBase : IReplaySubjectImplementation
+        private abstract class ReplayBufferBase : ReplayBase
         {
         {
-            private readonly object _gate = new object();
-            private bool _isDisposed;
-            private bool _isStopped;
-            private Exception _error;
-            private ImmutableList<IObserver<T>> _observers;
+            protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
+            {
+                return new FastImmediateObserver<T>(observer);
+            }
 
 
-            protected ReplayBufferBase()
+            protected override void DisposeCore()
             {
             {
-                _observers = ImmutableList<IObserver<T>>.Empty;
             }
             }
+        }
 
 
-            protected abstract void Trim();
-            protected abstract void AddValueToBuffer(T value);
-            protected abstract void ReplayBuffer(IObserver<T> observer);
+        private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
+        {
+            protected readonly Queue<T> _queue;
 
 
-            public bool HasObservers
+            protected ReplayManyBase(int queueSize)
+                : base()
             {
             {
-                get
-                {
-                    var observers = _observers;
-                    return observers != null && observers.Data.Length > 0;
-                }
+                _queue = new Queue<T>(Math.Min(queueSize, 64));
             }
             }
 
 
-            public void OnNext(T value)
+            protected override void Next(T value)
             {
             {
-                lock (_gate)
-                {
-                    CheckDisposed();
+                _queue.Enqueue(value);
+            }
 
 
-                    if (!_isStopped)
-                    {
-                        AddValueToBuffer(value);
-                        Trim();
+            protected override int Replay(IObserver<T> observer)
+            {
+                var n = _queue.Count;
 
 
-                        var o = _observers.Data;
-                        foreach (var observer in o)
-                            observer.OnNext(value);
-                    }
-                }
+                foreach (var item in _queue)
+                    observer.OnNext(item);
+
+                return n;
             }
             }
 
 
-            public void OnError(Exception error)
+            protected override void DisposeCore()
             {
             {
-                if (error == null)
-                    throw new ArgumentNullException("error");
+                _queue.Clear();
+            }
+        }
+    }
 
 
-                lock (_gate)
-                {
-                    CheckDisposed();
+    /// <summary>
+    /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
+    /// </summary>
+    /// <typeparam name="T">Type of the elements processed by the observer.</typeparam>
+    class FastImmediateObserver<T> : IScheduledObserver<T>
+    {
+        /// <summary>
+        /// Gate to control ownership transfer and protect data structures.
+        /// </summary>
+        private readonly object _gate = new object();
 
 
-                    if (!_isStopped)
-                    {
-                        _isStopped = true;
-                        _error = error;
-                        Trim();
+        /// <summary>
+        /// Observer to forward notifications to.
+        /// </summary>
+        private volatile IObserver<T> _observer;
 
 
-                        var o = _observers.Data;
-                        foreach (var observer in o)
-                            observer.OnError(error);
+        /// <summary>
+        /// Queue to enqueue OnNext notifications into.
+        /// </summary>
+        private Queue<T> _queue = new Queue<T>();
 
 
-                        _observers = ImmutableList<IObserver<T>>.Empty;
-                    }
-                }
-            }
+        /// <summary>
+        /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
+        /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
+        /// </summary>
+        private Queue<T> _queue2;
 
 
-            public void OnCompleted()
-            {
-                lock (_gate)
-                {
-                    CheckDisposed();
+        /// <summary>
+        /// Exception passed to an OnError notification, if any.
+        /// </summary>
+        private Exception _error;
 
 
-                    if (!_isStopped)
-                    {
-                        _isStopped = true;
-                        Trim();
+        /// <summary>
+        /// Indicates whether an OnCompleted notification was received.
+        /// </summary>
+        private bool _done;
 
 
-                        var o = _observers.Data;
-                        foreach (var observer in o)
-                            observer.OnCompleted();
+        /// <summary>
+        /// Indicates whether the observer is busy, i.e. some thread is actively draining the
+        /// notifications that were queued up.
+        /// </summary>
+        private bool _busy;
 
 
-                        _observers = ImmutableList<IObserver<T>>.Empty;
-                    }
-                }
-            }
+        /// <summary>
+        /// Indicates whether a failure occurred when the owner was draining the queue. This will
+        /// prevent future work to be processed.
+        /// </summary>
+        private bool _hasFaulted;
 
 
-            public IDisposable Subscribe(IObserver<T> observer)
-            {
-                if (observer == null)
-                    throw new ArgumentNullException("observer");
+        /// <summary>
+        /// Creates a new scheduled observer that proxies to the specified observer.
+        /// </summary>
+        /// <param name="observer">Observer to forward notifications to.</param>
+        public FastImmediateObserver(IObserver<T> observer)
+        {
+            _observer = observer;
+        }
 
 
-                var subscription = new Subscription(this, observer);
+        /// <summary>
+        /// Disposes the observer.
+        /// </summary>
+        public void Dispose()
+        {
+            Done();
+        }
 
 
-                lock (_gate)
-                {
-                    CheckDisposed();
+        /// <summary>
+        /// Notifies the observer of pending work. This will either cause the current owner to
+        /// process the newly enqueued notifications, or it will cause the calling thread to
+        /// become the owner and start processing the notification queue.
+        /// </summary>
+        public void EnsureActive()
+        {
+            EnsureActive(1);
+        }
 
 
+        /// <summary>
+        /// Notifies the observer of pending work. This will either cause the current owner to
+        /// process the newly enqueued notifications, or it will cause the calling thread to
+        /// become the owner and start processing the notification queue.
+        /// </summary>
+        /// <param name="count">The number of enqueued notifications to process (ignored).</param>
+        public void EnsureActive(int count)
+        {
+            var isOwner = false;
+
+            lock (_gate)
+            {
+                //
+                // If we failed to process work in the past, we'll simply drop it.
+                //
+                if (!_hasFaulted)
+                {
                     //
                     //
-                    // Notice the v1.x behavior of always calling Trim is preserved here.
-                    //
-                    // This may be subject (pun intended) of debate: should this policy
-                    // only be applied while the sequence is active? With the current
-                    // behavior, a sequence will "die out" after it has terminated by
-                    // continuing to drop OnNext notifications from the queue.
-                    //
-                    // In v1.x, this behavior was due to trimming based on the clock value
-                    // returned by scheduler.Now, applied to all but the terminal message
-                    // in the queue. Using the IStopwatch has the same effect. Either way,
-                    // we guarantee the final notification will be observed, but there's
-                    // no way to retain the buffer directly. One approach is to use the
-                    // time-based TakeLast operator and apply an unbounded ReplaySubject
-                    // to it.
-                    //
-                    // To conclude, we're keeping the behavior as-is for compatibility
-                    // reasons with v1.x.
+                    // If no-one is processing the notification queue, become the owner.
                     //
                     //
-                    Trim();
-                    _observers = _observers.Add(observer);
-
-                    ReplayBuffer(observer);
-
-                    if (_error != null)
+                    if (!_busy)
                     {
                     {
-                        observer.OnError(_error);
-                    }
-                    else if (_isStopped)
-                    {
-                        observer.OnCompleted();
+                        isOwner = true;
+                        _busy = true;
                     }
                     }
                 }
                 }
-
-                return subscription;
             }
             }
 
 
-            public void Unsubscribe(IObserver<T> observer)
+            if (isOwner)
             {
             {
-                lock (_gate)
+                while (true)
                 {
                 {
-                    if (!_isDisposed)
+                    var queue = default(Queue<T>);
+                    var error = default(Exception);
+                    var done = false;
+
+                    //
+                    // Steal notifications from the producer side to drain them to the observer.
+                    //
+                    lock (_gate)
                     {
                     {
-                        _observers = _observers.Remove(observer);
+                        //
+                        // Do we have any OnNext notifications to process?
+                        //
+                        if (_queue.Count > 0)
+                        {
+                            if (_queue2 == null)
+                            {
+                                _queue2 = new Queue<T>();
+                            }
+
+                            //
+                            // Swap out the current queue for a fresh or recycled one. The standby
+                            // queue is set to null; when notifications are sent out the processed
+                            // queue will become the new standby.
+                            //
+                            queue = _queue;
+                            _queue = _queue2;
+                            _queue2 = null;
+                        }
+
+                        //
+                        // Do we have any terminal notifications to process?
+                        //
+                        if (_error != null)
+                        {
+                            error = _error;
+                        }
+                        else if (_done)
+                        {
+                            done = true;
+                        }
+                        else if (queue == null)
+                        {
+                            //
+                            // No work left; quit the loop and let another thread become the
+                            // owner in the future.
+                            //
+                            _busy = false;
+                            break;
+                        }
                     }
                     }
-                }
-            }
 
 
-            private void CheckDisposed()
-            {
-                if (_isDisposed)
-                    throw new ObjectDisposedException(string.Empty);
-            }
+                    try
+                    {
+                        //
+                        // Process OnNext notifications, if any.
+                        //
+                        if (queue != null)
+                        {
+                            //
+                            // Drain the stolen OnNext notification queue.
+                            //
+                            while (queue.Count > 0)
+                            {
+                                _observer.OnNext(queue.Dequeue());
+                            }
+
+                            //
+                            // The queue is now empty, so we can reuse it by making it the standby
+                            // queue for a future swap.
+                            //
+                            lock (_gate)
+                            {
+                                _queue2 = queue;
+                            }
+                        }
+
+                        //
+                        // Process terminal notifications, if any. Notice we don't release ownership
+                        // after processing these notifications; we simply quit from the loop. This
+                        // will cause all processing of the scheduler observer to cease.
+                        //
+                        if (error != null)
+                        {
+                            var observer = Done();
+                            observer.OnError(error);
+                            break;
+                        }
+                        else if (done)
+                        {
+                            var observer = Done();
+                            observer.OnCompleted();
+                            break;
+                        }
+                    }
+                    catch
+                    {
+                        lock (_gate)
+                        {
+                            _hasFaulted = true;
+                            _queue.Clear();
+                        }
 
 
-            public void Dispose()
-            {
-                Dispose(true);
+                        throw;
+                    }
+                }
             }
             }
+        }
 
 
-            protected virtual void Dispose(bool disposing)
+        /// <summary>
+        /// Enqueues an OnCompleted notification.
+        /// </summary>
+        public void OnCompleted()
+        {
+            lock (_gate)
             {
             {
-                lock (_gate)
+                if (!_hasFaulted)
                 {
                 {
-                    _isDisposed = true;
-                    _observers = null;
+                    _done = true;
                 }
                 }
             }
             }
         }
         }
 
 
-        private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
+        /// <summary>
+        /// Enqueues an OnError notification.
+        /// </summary>
+        /// <param name="error">Error of the notification.</param>
+        public void OnError(Exception error)
         {
         {
-            private readonly Queue<T> _queue;
-
-            protected ReplayManyBase(int queueSize)
-                : base()
-            {
-                _queue = new Queue<T>(queueSize);
-            }
-
-            protected Queue<T> Queue
+            lock (_gate)
             {
             {
-                get
+                if (!_hasFaulted)
                 {
                 {
-                    return _queue;
+                    _error = error;
                 }
                 }
             }
             }
-
-            protected override void AddValueToBuffer(T value)
-            {
-                _queue.Enqueue(value);
-            }
-
-            protected override void ReplayBuffer(IObserver<T> observer)
-            {
-                foreach (var item in _queue)
-                    observer.OnNext(item);
-            }
-
-            protected override void Dispose(bool disposing)
-            {
-                base.Dispose(disposing);
-                _queue.Clear();
-            }
         }
         }
 
 
-        private class Subscription : IDisposable
+        /// <summary>
+        /// Enqueues an OnNext notification.
+        /// </summary>
+        /// <param name="value">Value of the notification.</param>
+        public void OnNext(T value)
         {
         {
-            private IReplaySubjectImplementation _subject;
-            private IObserver<T> _observer;
-
-            public Subscription(IReplaySubjectImplementation subject, IObserver<T> observer)
+            lock (_gate)
             {
             {
-                _subject = subject;
-                _observer = observer;
+                if (!_hasFaulted)
+                {
+                    _queue.Enqueue(value);
+                }
             }
             }
+        }
 
 
-            public void Dispose()
-            {
-                var observer = Interlocked.Exchange(ref _observer, null);
-                if (observer == null)
-                    return;
-
-                _subject.Unsubscribe(observer);
-                _subject = null;
-            }
+        /// <summary>
+        /// Terminates the observer upon receiving terminal notifications, thus preventing
+        /// future notifications to go out.
+        /// </summary>
+        /// <returns>Observer to send terminal notifications to.</returns>
+        private IObserver<T> Done()
+        {
+            return Interlocked.Exchange(ref _observer, NopObserver<T>.Instance);
         }
         }
     }
     }
 }
 }

+ 81 - 0
Rx.NET/Source/Tests.System.Reactive/Stress/Linq/ReplaySubject.cs

@@ -0,0 +1,81 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if STRESS
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Reactive.Linq;
+using System.Reactive.Subjects;
+using System.Threading.Tasks;
+
+namespace ReactiveTests.Stress.Linq
+{
+    public class ReplaySubject
+    {
+        /// <summary>
+        /// Tests the ReplaySubject with concurrent subscribers.
+        /// </summary>
+        public static void ConcurrentSubscribers()
+        {
+            var N = int.MaxValue;
+            var M = int.MaxValue;
+
+            var r = new ReplaySubject<int>(4);
+
+            var rnd = new Random();
+            var ts = new List<Task>();
+
+            for (var i = 0; i < 16; i++)
+            {
+                var rnd2 = new Random(rnd.Next());
+
+                ts.Add(Task.Run(async () =>
+                {
+                    var n = rnd2.Next(10, 1000);
+
+                    for (var j = 0; j < M; j++)
+                    {
+                        var xs = new List<int>();
+                        await r.Take(n).Scan((x1, x2) =>
+                        {
+                            if (x2 - x1 != 1)
+                                Debugger.Break();
+
+                            if (x2 == 0)
+                                Debugger.Break();
+
+                            return x2;
+                        }).ForEachAsync(xs.Add);
+
+                        var f = xs.First();
+                        if (!xs.SequenceEqual(Enumerable.Range(f, xs.Count)))
+                        {
+                            Console.WriteLine("FAIL!");
+                            Debugger.Break();
+                        }
+                        else
+                        {
+                            Console.Write(".");
+                        }
+
+                        if (j % 1000 == 0)
+                        {
+                            await Task.Delay(50);
+                        }
+                    }
+                }));
+            }
+
+            for (var i = 0; i < N; i++)
+            {
+                r.OnNext(i);
+            }
+
+            Console.WriteLine("Done!");
+
+            Task.WaitAll(ts.ToArray());
+        }
+    }
+}
+#endif

+ 1 - 0
Rx.NET/Source/Tests.System.Reactive/Tests.System.Reactive.csproj

@@ -63,6 +63,7 @@
     <Compile Include="Stress\Linq\FromEvent.cs" />
     <Compile Include="Stress\Linq\FromEvent.cs" />
     <Compile Include="Stress\Helpers.cs" />
     <Compile Include="Stress\Helpers.cs" />
     <Compile Include="Stress\Linq\Replay.cs" />
     <Compile Include="Stress\Linq\Replay.cs" />
+    <Compile Include="Stress\Linq\ReplaySubject.cs" />
     <Compile Include="TestBase.cs" />
     <Compile Include="TestBase.cs" />
     <Compile Include="TestLongRunningScheduler.cs" />
     <Compile Include="TestLongRunningScheduler.cs" />
     <Compile Include="Tests\Aliases.cs" />
     <Compile Include="Tests\Aliases.cs" />

+ 237 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs

@@ -1,13 +1,20 @@
 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 
 
 using System;
 using System;
+using System.Collections.Generic;
+using System.Linq;
 using System.Reactive;
 using System.Reactive;
 using System.Reactive.Concurrency;
 using System.Reactive.Concurrency;
 using System.Reactive.Subjects;
 using System.Reactive.Subjects;
+using System.Threading;
 using Microsoft.Reactive.Testing;
 using Microsoft.Reactive.Testing;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using ReactiveTests.Dummies;
 using ReactiveTests.Dummies;
 
 
+#if !NO_TPL
+using System.Threading.Tasks;
+#endif
+
 namespace ReactiveTests.Tests
 namespace ReactiveTests.Tests
 {
 {
     [TestClass]
     [TestClass]
@@ -1792,5 +1799,235 @@ namespace ReactiveTests.Tests
             Assert.AreEqual(NotificationKind.OnError, observer.Messages[3].Value.Kind);
             Assert.AreEqual(NotificationKind.OnError, observer.Messages[3].Value.Kind);
             Assert.AreEqual(expectedException, observer.Messages[3].Value.Exception);
             Assert.AreEqual(expectedException, observer.Messages[3].Value.Exception);
         }
         }
+
+        [TestMethod]
+        public void ReplaySubject_Reentrant()
+        {
+            var r = new ReplaySubject<int>(4);
+
+            r.OnNext(0);
+            r.OnNext(1);
+            r.OnNext(2);
+            r.OnNext(3);
+            r.OnNext(4);
+
+            var xs = new List<int>();
+
+            var i = 0;
+            r.Subscribe(x =>
+            {
+                xs.Add(x);
+
+                if (++i <= 10)
+                {
+                    r.OnNext(x);
+                }
+            });
+
+            r.OnNext(5);
+
+            Assert.IsTrue(xs.SequenceEqual(new[]
+            {
+                1, 2, 3, 4, // original
+                1, 2, 3, 4, // reentrant (+ fed back)
+                1, 2, 3, 4, // reentrant (+ first two fed back)
+                1, 2,       // reentrant
+                5           // tune in
+            }));
+        }
+
+        [TestMethod]
+        public void FastImmediateObserver_Simple1()
+        {
+            var res = FastImmediateObserverTest(fio =>
+            {
+                fio.OnNext(1);
+                fio.OnNext(2);
+                fio.OnNext(3);
+                fio.OnCompleted();
+
+                fio.EnsureActive(4);
+            });
+
+            res.AssertEqual(
+                OnNext(0, 1),
+                OnNext(1, 2),
+                OnNext(2, 3),
+                OnCompleted<int>(3)
+            );
+        }
+
+        [TestMethod]
+        public void FastImmediateObserver_Simple2()
+        {
+            var ex = new Exception();
+
+            var res = FastImmediateObserverTest(fio =>
+            {
+                fio.OnNext(1);
+                fio.OnNext(2);
+                fio.OnNext(3);
+                fio.OnError(ex);
+
+                fio.EnsureActive(4);
+            });
+
+            res.AssertEqual(
+                OnNext(0, 1),
+                OnNext(1, 2),
+                OnNext(2, 3),
+                OnError<int>(3, ex)
+            );
+        }
+
+        [TestMethod]
+        public void FastImmediateObserver_Simple3()
+        {
+            var res = FastImmediateObserverTest(fio =>
+            {
+                fio.OnNext(1);
+                fio.EnsureActive();
+
+                fio.OnNext(2);
+                fio.EnsureActive();
+
+                fio.OnNext(3);
+                fio.EnsureActive();
+
+                fio.OnCompleted();
+                fio.EnsureActive();
+            });
+
+            res.AssertEqual(
+                OnNext(0, 1),
+                OnNext(1, 2),
+                OnNext(2, 3),
+                OnCompleted<int>(3)
+            );
+        }
+
+        [TestMethod]
+        public void FastImmediateObserver_Fault()
+        {
+            var xs = new List<int>();
+
+            var o = Observer.Create<int>(
+                x => { xs.Add(x); if (x == 2) throw new Exception(); },
+                ex => { },
+                () => { }
+            );
+
+            var fio = new FastImmediateObserver<int>(o);
+
+            fio.OnNext(1);
+            fio.OnNext(2);
+            fio.OnNext(3);
+
+            ReactiveAssert.Throws<Exception>(() => fio.EnsureActive());
+
+            fio.OnNext(4);
+            fio.EnsureActive();
+
+            fio.OnNext(2);
+            fio.EnsureActive();
+
+            Assert.IsTrue(xs.Count == 2);
+        }
+
+#if !NO_TPL
+        [TestMethod]
+        public void FastImmediateObserver_Ownership1()
+        {
+            var xs = new List<int>();
+
+            var o = Observer.Create<int>(
+                xs.Add,
+                ex => { },
+                () => { }
+            );
+
+            var fio = new FastImmediateObserver<int>(o);
+
+            var ts = new Task[16];
+            var N = 100;
+
+            for (var i = 0; i < ts.Length; i++)
+            {
+                var j = i;
+
+                ts[i] = Task.Factory.StartNew(() =>
+                {
+                    for (var k = 0; k < N; k++)
+                    {
+                        fio.OnNext(j * N + k);
+                    }
+
+                    fio.EnsureActive(N);
+                });
+            }
+
+            Task.WaitAll(ts);
+
+            Assert.IsTrue(xs.Count == ts.Length * N);
+        }
+
+        [TestMethod]
+        public void FastImmediateObserver_Ownership2()
+        {
+            var cd = new CountdownEvent(3);
+
+            var w = new ManualResetEvent(false);
+            var e = new ManualResetEvent(false);
+
+            var xs = new List<int>();
+
+            var o = Observer.Create<int>(
+                x => { xs.Add(x); w.Set(); e.WaitOne(); cd.Signal(); },
+                ex => { },
+                () => { }
+            );
+
+            var fio = new FastImmediateObserver<int>(o);
+
+            fio.OnNext(1);
+
+            var t = Task.Factory.StartNew(() =>
+            {
+                fio.EnsureActive();
+            });
+
+            w.WaitOne();
+
+            fio.OnNext(2);
+            fio.OnNext(3);
+
+            fio.EnsureActive(2);
+
+            e.Set();
+
+            cd.Wait();
+
+            Assert.IsTrue(xs.Count == 3);
+        }
+#endif
+
+        private IEnumerable<Recorded<Notification<int>>> FastImmediateObserverTest(Action<IScheduledObserver<int>> f)
+        {
+            var ns = new List<Recorded<Notification<int>>>();
+
+            var l = 0L;
+
+            var o = Observer.Create<int>(
+                x => { ns.Add(OnNext<int>(l++, x)); },
+                ex => { ns.Add(OnError<int>(l++, ex)); },
+                () => { ns.Add(OnCompleted<int>(l++)); }
+            );
+
+            var fio = new FastImmediateObserver<int>(o);
+
+            f(fio);
+
+            return ns;
+        }
     }
     }
 }
 }