| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.#if !NO_PERFusing System;using System.Collections.Generic;using System.Diagnostics;using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Threading;namespace System.Reactive.Linq.ObservableImpl{    class Buffer<TSource> : Producer<IList<TSource>>    {        private readonly IObservable<TSource> _source;        private readonly int _count;        private readonly int _skip;        private readonly TimeSpan _timeSpan;        private readonly TimeSpan _timeShift;        private readonly IScheduler _scheduler;        public Buffer(IObservable<TSource> source, int count, int skip)        {            _source = source;            _count = count;            _skip = skip;        }        public Buffer(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)        {            _source = source;            _timeSpan = timeSpan;            _timeShift = timeShift;            _scheduler = scheduler;        }        public Buffer(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)        {            _source = source;            _timeSpan = timeSpan;            _count = count;            _scheduler = scheduler;        }        protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)        {            if (_scheduler == null)            {                var sink = new _(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else if (_count > 0)            {                var sink = new Impl(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else            {                if (_timeSpan == _timeShift)                {                    var sink = new BufferTimeShift(this, observer, cancel);                    setSink(sink);                    return sink.Run();                }                else                {                    var sink = new BufferImpl(this, observer, cancel);                    setSink(sink);                    return sink.Run();                }            }        }        class _ : Sink<IList<TSource>>, IObserver<TSource>        {            private readonly Buffer<TSource> _parent;            public _(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private Queue<IList<TSource>> _queue;            private int _n;            public IDisposable Run()            {                _queue = new Queue<IList<TSource>>();                _n = 0;                CreateWindow();                return _parent._source.SubscribeSafe(this);            }            private void CreateWindow()            {                var s = new List<TSource>();                _queue.Enqueue(s);            }            public void OnNext(TSource value)            {                foreach (var s in _queue)                    s.Add(value);                var c = _n - _parent._count + 1;                if (c >= 0 && c % _parent._skip == 0)                {                    var s = _queue.Dequeue();                    if (s.Count > 0)                        base._observer.OnNext(s);                }                _n++;                if (_n % _parent._skip == 0)                    CreateWindow();            }            public void OnError(Exception error)            {                while (_queue.Count > 0)                    _queue.Dequeue().Clear();                base._observer.OnError(error);                base.Dispose();            }            public void OnCompleted()            {                while (_queue.Count > 0)                {                    var s = _queue.Dequeue();                    if (s.Count > 0)                        base._observer.OnNext(s);                }                base._observer.OnCompleted();                base.Dispose();            }        }        class BufferImpl : Sink<IList<TSource>>, IObserver<TSource>        {            private readonly Buffer<TSource> _parent;            public BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private TimeSpan _totalTime;            private TimeSpan _nextShift;            private TimeSpan _nextSpan;            private object _gate;            private Queue<List<TSource>> _q;            private SerialDisposable _timerD;            public IDisposable Run()            {                _totalTime = TimeSpan.Zero;                _nextShift = _parent._timeShift;                _nextSpan = _parent._timeSpan;                _gate = new object();                _q = new Queue<List<TSource>>();                _timerD = new SerialDisposable();                CreateWindow();                CreateTimer();                var subscription = _parent._source.SubscribeSafe(this);                return StableCompositeDisposable.Create(_timerD, subscription);            }            private void CreateWindow()            {                var s = new List<TSource>();                _q.Enqueue(s);            }            private void CreateTimer()            {                var m = new SingleAssignmentDisposable();                _timerD.Disposable = m;                var isSpan = false;                var isShift = false;                if (_nextSpan == _nextShift)                {                    isSpan = true;                    isShift = true;                }                else if (_nextSpan < _nextShift)                    isSpan = true;                else                    isShift = true;                var newTotalTime = isSpan ? _nextSpan : _nextShift;                var ts = newTotalTime - _totalTime;                _totalTime = newTotalTime;                if (isSpan)                    _nextSpan += _parent._timeShift;                if (isShift)                    _nextShift += _parent._timeShift;                m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);            }            struct State            {                public bool isSpan;                public bool isShift;            }            private IDisposable Tick(IScheduler self, State state)            {                lock (_gate)                {                    //                    // Before v2, the two operations below were reversed. This doesn't have an observable                    // difference for Buffer, but is done to keep code consistent with Window, where we                    // took a breaking change in v2 to ensure consistency across overloads. For more info,                    // see the comment in Tick for Window.                    //                    if (state.isSpan)                    {                        var s = _q.Dequeue();                        base._observer.OnNext(s);                    }                    if (state.isShift)                    {                        CreateWindow();                    }                }                CreateTimer();                return Disposable.Empty;            }            public void OnNext(TSource value)            {                lock (_gate)                {                    foreach (var s in _q)                        s.Add(value);                }            }            public void OnError(Exception error)            {                lock (_gate)                {                    while (_q.Count > 0)                        _q.Dequeue().Clear();                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    while (_q.Count > 0)                        base._observer.OnNext(_q.Dequeue());                    base._observer.OnCompleted();                    base.Dispose();                }            }        }        class BufferTimeShift : Sink<IList<TSource>>, IObserver<TSource>        {            private readonly Buffer<TSource> _parent;            public BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private object _gate;            private List<TSource> _list;            public IDisposable Run()            {                _gate = new object();                _list = new List<TSource>();                var d = _parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick);                var s = _parent._source.SubscribeSafe(this);                return StableCompositeDisposable.Create(d, s);            }            private void Tick()            {                lock (_gate)                {                    base._observer.OnNext(_list);                    _list = new List<TSource>();                }            }            public void OnNext(TSource value)            {                lock (_gate)                {                    _list.Add(value);                }            }            public void OnError(Exception error)            {                lock (_gate)                {                    _list.Clear();                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    base._observer.OnNext(_list);                    base._observer.OnCompleted();                    base.Dispose();                }            }        }        class Impl : Sink<IList<TSource>>, IObserver<TSource>        {            private readonly Buffer<TSource> _parent;            public Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private object _gate;            private IList<TSource> _s;            private int _n;            private int _windowId;            private SerialDisposable _timerD;            public IDisposable Run()            {                _gate = new object();                _s = default(IList<TSource>);                _n = 0;                _windowId = 0;                _timerD = new SerialDisposable();                _s = new List<TSource>();                CreateTimer(0);                var subscription = _parent._source.SubscribeSafe(this);                return StableCompositeDisposable.Create(_timerD, subscription);            }            private void CreateTimer(int id)            {                var m = new SingleAssignmentDisposable();                _timerD.Disposable = m;                m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);            }            private IDisposable Tick(IScheduler self, int id)            {                var d = Disposable.Empty;                var newId = 0;                lock (_gate)                {                    if (id != _windowId)                        return d;                    _n = 0;                    newId = ++_windowId;                    var res = _s;                    _s = new List<TSource>();                    base._observer.OnNext(res);                    CreateTimer(newId);                }                return d;            }            public void OnNext(TSource value)            {                var newWindow = false;                var newId = 0;                lock (_gate)                {                    _s.Add(value);                    _n++;                    if (_n == _parent._count)                    {                        newWindow = true;                        _n = 0;                        newId = ++_windowId;                        var res = _s;                        _s = new List<TSource>();                        base._observer.OnNext(res);                    }                    if (newWindow)                        CreateTimer(newId);                }            }            public void OnError(Exception error)            {                lock (_gate)                {                    _s.Clear();                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    base._observer.OnNext(_s);                    base._observer.OnCompleted();                    base.Dispose();                }            }        }    }    class Buffer<TSource, TBufferClosing> : Producer<IList<TSource>>    {        private readonly IObservable<TSource> _source;        private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;        private readonly IObservable<TBufferClosing> _bufferBoundaries;        public Buffer(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)        {            _source = source;            _bufferClosingSelector = bufferClosingSelector;        }        public Buffer(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries)        {            _source = source;            _bufferBoundaries = bufferBoundaries;        }        protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)        {            if (_bufferClosingSelector != null)            {                var sink = new _(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else            {                var sink = new Beta(this, observer, cancel);                setSink(sink);                return sink.Run();            }        }        class _ : Sink<IList<TSource>>, IObserver<TSource>        {            private readonly Buffer<TSource, TBufferClosing> _parent;            public _(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private IList<TSource> _buffer;            private object _gate;            private AsyncLock _bufferGate;            private SerialDisposable _m;            public IDisposable Run()            {                _buffer = new List<TSource>();                _gate = new object();                _bufferGate = new AsyncLock();                _m = new SerialDisposable();                var groupDisposable = new CompositeDisposable(2) { _m };                groupDisposable.Add(_parent._source.SubscribeSafe(this));                _bufferGate.Wait(CreateBufferClose);                return groupDisposable;            }            private void CreateBufferClose()            {                var bufferClose = default(IObservable<TBufferClosing>);                try                {                    bufferClose = _parent._bufferClosingSelector();                }                catch (Exception exception)                {                    lock (_gate)                    {                        base._observer.OnError(exception);                        base.Dispose();                    }                    return;                }                var closingSubscription = new SingleAssignmentDisposable();                _m.Disposable = closingSubscription;                closingSubscription.Disposable = bufferClose.SubscribeSafe(new Omega(this, closingSubscription));            }            private void CloseBuffer(IDisposable closingSubscription)            {                closingSubscription.Dispose();                lock (_gate)                {                    var res = _buffer;                    _buffer = new List<TSource>();                    base._observer.OnNext(res);                }                _bufferGate.Wait(CreateBufferClose);            }            class Omega : IObserver<TBufferClosing>            {                private readonly _ _parent;                private readonly IDisposable _self;                public Omega(_ parent, IDisposable self)                {                    _parent = parent;                    _self = self;                }                public void OnNext(TBufferClosing value)                {                    _parent.CloseBuffer(_self);                }                public void OnError(Exception error)                {                    _parent.OnError(error);                }                public void OnCompleted()                {                    _parent.CloseBuffer(_self);                }            }            public void OnNext(TSource value)            {                lock (_gate)                {                    _buffer.Add(value);                }            }            public void OnError(Exception error)            {                lock (_gate)                {                    _buffer.Clear();                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    base._observer.OnNext(_buffer);                    base._observer.OnCompleted();                    base.Dispose();                }            }        }        class Beta : Sink<IList<TSource>>, IObserver<TSource>        {            private readonly Buffer<TSource, TBufferClosing> _parent;            public Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private IList<TSource> _buffer;            private object _gate;            private RefCountDisposable _refCountDisposable;            public IDisposable Run()            {                _buffer = new List<TSource>();                _gate = new object();                var d = new CompositeDisposable(2);                _refCountDisposable = new RefCountDisposable(d);                d.Add(_parent._source.SubscribeSafe(this));                d.Add(_parent._bufferBoundaries.SubscribeSafe(new Omega(this)));                return _refCountDisposable;            }            class Omega : IObserver<TBufferClosing>            {                private readonly Beta _parent;                public Omega(Beta parent)                {                    _parent = parent;                }                public void OnNext(TBufferClosing value)                {                    lock (_parent._gate)                    {                        var res = _parent._buffer;                        _parent._buffer = new List<TSource>();                        _parent._observer.OnNext(res);                    }                }                public void OnError(Exception error)                {                    _parent.OnError(error);                }                public void OnCompleted()                {                    _parent.OnCompleted();                }            }            public void OnNext(TSource value)            {                lock (_gate)                {                    _buffer.Add(value);                }            }            public void OnError(Exception error)            {                lock (_gate)                {                    _buffer.Clear();                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    base._observer.OnNext(_buffer);                    base._observer.OnCompleted();                    base.Dispose();                }            }        }    }}#endif
 |