| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.#if !NO_PERFusing System;using System.Collections.Generic;using System.Diagnostics;using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Reactive.Subjects;using System.Threading;namespace System.Reactive.Linq.ObservableImpl{    class Window<TSource> : Producer<IObservable<TSource>>    {        private readonly IObservable<TSource> _source;        private readonly int _count;        private readonly int _skip;        private readonly TimeSpan _timeSpan;        private readonly TimeSpan _timeShift;        private readonly IScheduler _scheduler;        public Window(IObservable<TSource> source, int count, int skip)        {            _source = source;            _count = count;            _skip = skip;        }        public Window(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)        {            _source = source;            _timeSpan = timeSpan;            _timeShift = timeShift;            _scheduler = scheduler;        }        public Window(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)        {            _source = source;            _timeSpan = timeSpan;            _count = count;            _scheduler = scheduler;        }        protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)        {            if (_scheduler == null)            {                var sink = new _(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else if (_count > 0)            {                var sink = new BoundedWindowImpl(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else            {                if (_timeSpan == _timeShift)                {                    var sink = new TimeShiftImpl(this, observer, cancel);                    setSink(sink);                    return sink.Run();                }                else                {                    var sink = new WindowImpl(this, observer, cancel);                    setSink(sink);                    return sink.Run();                }            }        }        class _ : Sink<IObservable<TSource>>, IObserver<TSource>        {            private readonly Window<TSource> _parent;            public _(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private Queue<ISubject<TSource>> _queue;            private int _n;            private SingleAssignmentDisposable _m;            private RefCountDisposable _refCountDisposable;            public IDisposable Run()            {                _queue = new Queue<ISubject<TSource>>();                _n = 0;                _m = new SingleAssignmentDisposable();                _refCountDisposable = new RefCountDisposable(_m);                var firstWindow = CreateWindow();                base._observer.OnNext(firstWindow);                _m.Disposable = _parent._source.SubscribeSafe(this);                return _refCountDisposable;            }            private IObservable<TSource> CreateWindow()            {                var s = new Subject<TSource>();                _queue.Enqueue(s);                return new WindowObservable<TSource>(s, _refCountDisposable);            }            public void OnNext(TSource value)            {                foreach (var s in _queue)                    s.OnNext(value);                var c = _n - _parent._count + 1;                if (c >= 0 && c % _parent._skip == 0)                {                    var s = _queue.Dequeue();                    s.OnCompleted();                }                _n++;                if (_n % _parent._skip == 0)                {                    var newWindow = CreateWindow();                    base._observer.OnNext(newWindow);                }            }            public void OnError(Exception error)            {                while (_queue.Count > 0)                    _queue.Dequeue().OnError(error);                base._observer.OnError(error);                base.Dispose();            }            public void OnCompleted()            {                while (_queue.Count > 0)                    _queue.Dequeue().OnCompleted();                base._observer.OnCompleted();                base.Dispose();            }        }        class WindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>        {            private readonly Window<TSource> _parent;            public WindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private TimeSpan _totalTime;            private TimeSpan _nextShift;            private TimeSpan _nextSpan;            private object _gate;            private Queue<ISubject<TSource>> _q;            private SerialDisposable _timerD;            private RefCountDisposable _refCountDisposable;            public IDisposable Run()            {                _totalTime = TimeSpan.Zero;                _nextShift = _parent._timeShift;                _nextSpan = _parent._timeSpan;                _gate = new object();                _q = new Queue<ISubject<TSource>>();                _timerD = new SerialDisposable();                var groupDisposable = new CompositeDisposable(2) { _timerD };                _refCountDisposable = new RefCountDisposable(groupDisposable);                CreateWindow();                CreateTimer();                groupDisposable.Add(_parent._source.SubscribeSafe(this));                return _refCountDisposable;            }            private void CreateWindow()            {                var s = new Subject<TSource>();                _q.Enqueue(s);                base._observer.OnNext(new WindowObservable<TSource>(s, _refCountDisposable));            }            private void CreateTimer()            {                var m = new SingleAssignmentDisposable();                _timerD.Disposable = m;                var isSpan = false;                var isShift = false;                if (_nextSpan == _nextShift)                {                    isSpan = true;                    isShift = true;                }                else if (_nextSpan < _nextShift)                    isSpan = true;                else                    isShift = true;                var newTotalTime = isSpan ? _nextSpan : _nextShift;                var ts = newTotalTime - _totalTime;                _totalTime = newTotalTime;                if (isSpan)                    _nextSpan += _parent._timeShift;                if (isShift)                    _nextShift += _parent._timeShift;                m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);            }            struct State            {                public bool isSpan;                public bool isShift;            }            private IDisposable Tick(IScheduler self, State state)            {                lock (_gate)                {                    //                    // BREAKING CHANGE v2 > v1.x - Making behavior of sending OnCompleted to the window                    //                             before sending out a new window consistent across all                    //                             overloads of Window and Buffer. Before v2, the two                    //                             operations below were reversed.                    //                    if (state.isSpan)                    {                        var s = _q.Dequeue();                        s.OnCompleted();                    }                    if (state.isShift)                    {                        CreateWindow();                    }                }                CreateTimer();                return Disposable.Empty;            }            public void OnNext(TSource value)            {                lock (_gate)                {                    foreach (var s in _q)                        s.OnNext(value);                }            }            public void OnError(Exception error)            {                lock (_gate)                {                    foreach (var s in _q)                        s.OnError(error);                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    foreach (var s in _q)                        s.OnCompleted();                    base._observer.OnCompleted();                    base.Dispose();                }            }        }        class TimeShiftImpl : Sink<IObservable<TSource>>, IObserver<TSource>        {            private readonly Window<TSource> _parent;            public TimeShiftImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private object _gate;            private Subject<TSource> _subject;            private RefCountDisposable _refCountDisposable;            public IDisposable Run()            {                _gate = new object();                var groupDisposable = new CompositeDisposable(2);                _refCountDisposable = new RefCountDisposable(groupDisposable);                CreateWindow();                groupDisposable.Add(_parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick));                groupDisposable.Add(_parent._source.SubscribeSafe(this));                return _refCountDisposable;            }            private void Tick()            {                lock (_gate)                {                    _subject.OnCompleted();                    CreateWindow();                }            }            private void CreateWindow()            {                _subject = new Subject<TSource>();                base._observer.OnNext(new WindowObservable<TSource>(_subject, _refCountDisposable));            }            public void OnNext(TSource value)            {                lock (_gate)                {                    _subject.OnNext(value);                }            }            public void OnError(Exception error)            {                lock (_gate)                {                    _subject.OnError(error);                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    _subject.OnCompleted();                    base._observer.OnCompleted();                    base.Dispose();                }            }        }        class BoundedWindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>        {            private readonly Window<TSource> _parent;            public BoundedWindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private object _gate;            private ISubject<TSource> _s;            private int _n;            private int _windowId;            private SerialDisposable _timerD;            private RefCountDisposable _refCountDisposable;            public IDisposable Run()            {                _gate = new object();                _s = default(ISubject<TSource>);                _n = 0;                _windowId = 0;                _timerD = new SerialDisposable();                var groupDisposable = new CompositeDisposable(2) { _timerD };                _refCountDisposable = new RefCountDisposable(groupDisposable);                _s = new Subject<TSource>();                base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));                CreateTimer(0);                groupDisposable.Add(_parent._source.SubscribeSafe(this));                return _refCountDisposable;            }            private void CreateTimer(int id)            {                var m = new SingleAssignmentDisposable();                _timerD.Disposable = m;                m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);            }            private IDisposable Tick(IScheduler self, int id)            {                var d = Disposable.Empty;                var newId = 0;                lock (_gate)                {                    if (id != _windowId)                        return d;                    _n = 0;                    newId = ++_windowId;                    _s.OnCompleted();                    _s = new Subject<TSource>();                    base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));                }                CreateTimer(newId);                return d;            }            public void OnNext(TSource value)            {                var newWindow = false;                var newId = 0;                lock (_gate)                {                    _s.OnNext(value);                    _n++;                    if (_n == _parent._count)                    {                        newWindow = true;                        _n = 0;                        newId = ++_windowId;                        _s.OnCompleted();                        _s = new Subject<TSource>();                        base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));                    }                }                if (newWindow)                    CreateTimer(newId);            }            public void OnError(Exception error)            {                lock (_gate)                {                    _s.OnError(error);                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    _s.OnCompleted();                    base._observer.OnCompleted();                    base.Dispose();                }            }        }    }    class Window<TSource, TWindowClosing> : Producer<IObservable<TSource>>    {        private readonly IObservable<TSource> _source;        private readonly Func<IObservable<TWindowClosing>> _windowClosingSelector;        private readonly IObservable<TWindowClosing> _windowBoundaries;        public Window(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector)        {            _source = source;            _windowClosingSelector = windowClosingSelector;        }        public Window(IObservable<TSource> source, IObservable<TWindowClosing> windowBoundaries)        {            _source = source;            _windowBoundaries = windowBoundaries;        }        protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)        {            if (_windowClosingSelector != null)            {                var sink = new _(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else            {                var sink = new Beta(this, observer, cancel);                setSink(sink);                return sink.Run();            }        }        class _ : Sink<IObservable<TSource>>, IObserver<TSource>        {            private readonly Window<TSource, TWindowClosing> _parent;            public _(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private ISubject<TSource> _window;            private object _gate;            private AsyncLock _windowGate;            private SerialDisposable _m;            private RefCountDisposable _refCountDisposable;            public IDisposable Run()            {                _window = new Subject<TSource>();                _gate = new object();                _windowGate = new AsyncLock();                _m = new SerialDisposable();                var groupDisposable = new CompositeDisposable(2) { _m };                _refCountDisposable = new RefCountDisposable(groupDisposable);                var window = new WindowObservable<TSource>(_window, _refCountDisposable);                base._observer.OnNext(window);                groupDisposable.Add(_parent._source.SubscribeSafe(this));                _windowGate.Wait(CreateWindowClose);                return _refCountDisposable;            }            private void CreateWindowClose()            {                var windowClose = default(IObservable<TWindowClosing>);                try                {                    windowClose = _parent._windowClosingSelector();                }                catch (Exception exception)                {                    lock (_gate)                    {                        base._observer.OnError(exception);                        base.Dispose();                    }                    return;                }                var closingSubscription = new SingleAssignmentDisposable();                _m.Disposable = closingSubscription;                closingSubscription.Disposable = windowClose.SubscribeSafe(new Omega(this, closingSubscription));            }            private void CloseWindow(IDisposable closingSubscription)            {                closingSubscription.Dispose();                lock (_gate)                {                    _window.OnCompleted();                    _window = new Subject<TSource>();                    var window = new WindowObservable<TSource>(_window, _refCountDisposable);                    base._observer.OnNext(window);                }                _windowGate.Wait(CreateWindowClose);            }            class Omega : IObserver<TWindowClosing>            {                private readonly _ _parent;                private readonly IDisposable _self;                public Omega(_ parent, IDisposable self)                {                    _parent = parent;                    _self = self;                }                public void OnNext(TWindowClosing value)                {                    _parent.CloseWindow(_self);                }                public void OnError(Exception error)                {                    _parent.OnError(error);                }                public void OnCompleted()                {                    _parent.CloseWindow(_self);                }            }            public void OnNext(TSource value)            {                lock (_gate)                {                    _window.OnNext(value);                }            }            public void OnError(Exception error)            {                lock (_gate)                {                    _window.OnError(error);                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    _window.OnCompleted();                    base._observer.OnCompleted();                    base.Dispose();                }            }        }        class Beta : Sink<IObservable<TSource>>, IObserver<TSource>        {            private readonly Window<TSource, TWindowClosing> _parent;            public Beta(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private ISubject<TSource> _window;            private object _gate;            private RefCountDisposable _refCountDisposable;                        public IDisposable Run()            {                _window = new Subject<TSource>();                _gate = new object();                var d = new CompositeDisposable(2);                _refCountDisposable = new RefCountDisposable(d);                var window = new WindowObservable<TSource>(_window, _refCountDisposable);                base._observer.OnNext(window);                d.Add(_parent._source.SubscribeSafe(this));                d.Add(_parent._windowBoundaries.SubscribeSafe(new Omega(this)));                return _refCountDisposable;            }            class Omega : IObserver<TWindowClosing>            {                private readonly Beta _parent;                public Omega(Beta parent)                {                    _parent = parent;                }                public void OnNext(TWindowClosing value)                {                    lock (_parent._gate)                    {                        _parent._window.OnCompleted();                        _parent._window = new Subject<TSource>();                        var window = new WindowObservable<TSource>(_parent._window, _parent._refCountDisposable);                        _parent._observer.OnNext(window);                    }                }                public void OnError(Exception error)                {                    _parent.OnError(error);                }                public void OnCompleted()                {                    _parent.OnCompleted();                }            }            public void OnNext(TSource value)            {                lock (_gate)                {                    _window.OnNext(value);                }            }            public void OnError(Exception error)            {                lock (_gate)                {                    _window.OnError(error);                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                lock (_gate)                {                    _window.OnCompleted();                    base._observer.OnCompleted();                    base.Dispose();                }            }        }    }    class WindowObservable<TSource> : AddRef<TSource>    {        public WindowObservable(IObservable<TSource> source, RefCountDisposable refCount)            : base(source, refCount)        {        }    }}#endif
 |