| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.#if !NO_PERFusing System;using System.Threading;namespace System.Reactive.Linq.ObservableImpl{    class ForEach<TSource>    {        public class _ : IObserver<TSource>        {            private readonly Action<TSource> _onNext;            private readonly Action _done;            private Exception _exception;            private int _stopped;            public _(Action<TSource> onNext, Action done)            {                _onNext = onNext;                _done = done;                _stopped = 0;            }            public Exception Error            {                get { return _exception; }            }            public void OnNext(TSource value)            {                if (_stopped == 0)                {                    try                    {                        _onNext(value);                    }                    catch (Exception ex)                    {                        OnError(ex);                    }                }            }            public void OnError(Exception error)            {                if (Interlocked.Exchange(ref _stopped, 1) == 0)                {                    _exception = error;                    _done();                }            }            public void OnCompleted()            {                if (Interlocked.Exchange(ref _stopped, 1) == 0)                {                    _done();                }            }        }        public class ForEachImpl : IObserver<TSource>        {            private readonly Action<TSource, int> _onNext;            private readonly Action _done;            private int _index;            private Exception _exception;            private int _stopped;            public ForEachImpl(Action<TSource, int> onNext, Action done)            {                _onNext = onNext;                _done = done;                                _index = 0;                _stopped = 0;            }            public Exception Error            {                get { return _exception; }            }            public void OnNext(TSource value)            {                if (_stopped == 0)                {                    try                    {                        _onNext(value, checked(_index++));                    }                    catch (Exception ex)                    {                        OnError(ex);                    }                }            }            public void OnError(Exception error)            {                if (Interlocked.Exchange(ref _stopped, 1) == 0)                {                    _exception = error;                    _done();                }            }            public void OnCompleted()            {                if (Interlocked.Exchange(ref _stopped, 1) == 0)                {                    _done();                }            }        }    }}#endif
 |