| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System.Reactive.Disposables;namespace System.Reactive{    class AutoDetachObserver<T> : ObserverBase<T>    {        private readonly IObserver<T> observer;        private readonly SingleAssignmentDisposable m = new SingleAssignmentDisposable();        public AutoDetachObserver(IObserver<T> observer)        {            this.observer = observer;        }        public IDisposable Disposable        {            set { m.Disposable = value; }        }        protected override void OnNextCore(T value)        {            //            // Safeguarding of the pipeline against rogue observers is required for proper            // resource cleanup. Consider the following example:            //            //   var xs  = Observable.Interval(TimeSpan.FromSeconds(1));            //   var ys  = <some random sequence>;            //   var res = xs.CombineLatest(ys, (x, y) => x + y);            //            // The marble diagram of the query above looks as follows:            //            //   xs  -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...            //                  |     |     |     |     |     |     |     |     |            //   ys  --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...            //               |  |  |  |     |  |  |  |  |     |     |  |  |     |            //               v  v  v  v     v  v  v  v  v     v     v  v  v     v            //   res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...            //                                 |            //                                @#&            //            // Notice the free-threaded nature of Rx, where messages on the resulting sequence            // are produced by either of the two input sequences to CombineLatest.            //            // Now assume an exception happens in the OnNext callback for the observer of res,            // at the indicated point marked with @#& above. The callback runs in the context            // of ys, so the exception will take down the scheduler thread of ys. This by            // itself is a problem (that can be mitigated by a Catch operator on IScheduler),            // but notice how the timer that produces xs is kept alive.            //            // The safe-guarding code below ensures the acquired resources are disposed when            // the user callback throws.            //            var __noError = false;            try            {                observer.OnNext(value);                __noError = true;            }            finally            {                if (!__noError)                    Dispose();            }        }        protected override void OnErrorCore(Exception exception)        {            try            {                observer.OnError(exception);            }            finally            {                Dispose();            }        }        protected override void OnCompletedCore()        {            try            {                observer.OnCompleted();            }            finally            {                Dispose();            }        }        protected override void Dispose(bool disposing)        {            base.Dispose(disposing);            if (disposing)                m.Dispose();        }    }}
 |