| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.#if !NO_PERFusing System;using System.Collections.Generic;using System.Reactive.Threading;using System.Threading;namespace System.Reactive.Linq.ObservableImpl{    class Latest<TSource> : PushToPullAdapter<TSource, TSource>    {        public Latest(IObservable<TSource> source)            : base(source)        {        }        protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)        {            return new _(subscription);        }        class _ : PushToPullSink<TSource, TSource>        {            private readonly object _gate;#if !NO_CDS            private readonly SemaphoreSlim _semaphore;#else            private readonly Semaphore _semaphore;#endif            public _(IDisposable subscription)                : base(subscription)            {                _gate = new object();#if !NO_CDS                _semaphore = new SemaphoreSlim(0, 1);#else                _semaphore = new Semaphore(0, 1);#endif            }            private bool _notificationAvailable;            private NotificationKind _kind;            private TSource _value;            private Exception _error;            public override void OnNext(TSource value)            {                var lackedValue = false;                lock (_gate)                {                    lackedValue = !_notificationAvailable;                    _notificationAvailable = true;                    _kind = NotificationKind.OnNext;                    _value = value;                }                if (lackedValue)                    _semaphore.Release();            }            public override void OnError(Exception error)            {                base.Dispose();                var lackedValue = false;                lock (_gate)                {                    lackedValue = !_notificationAvailable;                    _notificationAvailable = true;                    _kind = NotificationKind.OnError;                    _error = error;                }                if (lackedValue)                    _semaphore.Release();            }            public override void OnCompleted()            {                base.Dispose();                var lackedValue = false;                lock (_gate)                {                    lackedValue = !_notificationAvailable;                    _notificationAvailable = true;                    _kind = NotificationKind.OnCompleted;                }                if (lackedValue)                    _semaphore.Release();            }            public override bool TryMoveNext(out TSource current)            {                var kind = default(NotificationKind);                var value = default(TSource);                var error = default(Exception);#if !NO_CDS                _semaphore.Wait();#else                _semaphore.WaitOne();#endif                lock (_gate)                {                    kind = _kind;                    switch (kind)                    {                        case NotificationKind.OnNext:                            value = _value;                            break;                        case NotificationKind.OnError:                            error = _error;                            break;                    }                    _notificationAvailable = false;                }                switch (kind)                {                    case NotificationKind.OnNext:                        current = _value;                        return true;                    case NotificationKind.OnError:                        error.Throw();                        break;                    case NotificationKind.OnCompleted:                        break;                }                current = default(TSource);                return false;            }        }    }}#endif
 |