Bladeren bron

Optimizing layouts of Window.

Bart De Smet 8 jaren geleden
bovenliggende
commit
64481f95db

+ 532 - 507
Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs

@@ -9,448 +9,444 @@ using System.Reactive.Subjects;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Window<TSource> : Producer<IObservable<TSource>>
+    internal static class Window<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly int _count;
-        private readonly int _skip;
-
-        private readonly TimeSpan _timeSpan;
-        private readonly TimeSpan _timeShift;
-        private readonly IScheduler _scheduler;
-
-        public Window(IObservable<TSource> source, int count, int skip)
-        {
-            _source = source;
-            _count = count;
-            _skip = skip;
-        }
-
-        public Window(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
+        internal sealed class Count : Producer<IObservable<TSource>>
         {
-            _source = source;
-            _timeSpan = timeSpan;
-            _timeShift = timeShift;
-            _scheduler = scheduler;
-        }
+            private readonly IObservable<TSource> _source;
+            private readonly int _count;
+            private readonly int _skip;
 
-        public Window(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
-        {
-            _source = source;
-            _timeSpan = timeSpan;
-            _count = count;
-            _scheduler = scheduler;
-        }
-
-        protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_scheduler == null)
+            public Count(IObservable<TSource> source, int count, int skip)
             {
-                var sink = new _(this, observer, cancel);
-                setSink(sink);
-                return sink.Run();
+                _source = source;
+                _count = count;
+                _skip = skip;
             }
-            else if (_count > 0)
+
+            protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new BoundedWindowImpl(this, observer, cancel);
+                var sink = new _(this, observer, cancel);
                 setSink(sink);
-                return sink.Run();
+                return sink.Run(_source);
             }
-            else
-            {
-                if (_timeSpan == _timeShift)
-                {
-                    var sink = new TimeShiftImpl(this, observer, cancel);
-                    setSink(sink);
-                    return sink.Run();
-                }
-                else
-                {
-                    var sink = new WindowImpl(this, observer, cancel);
-                    setSink(sink);
-                    return sink.Run();
-                }
-            }
-        }
-
-        class _ : Sink<IObservable<TSource>>, IObserver<TSource>
-        {
-            private readonly Window<TSource> _parent;
 
-            public _(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<IObservable<TSource>>, IObserver<TSource>
             {
-                _parent = parent;
-            }
+                private readonly Queue<ISubject<TSource>> _queue = new Queue<ISubject<TSource>>();
+                private readonly SingleAssignmentDisposable _m = new SingleAssignmentDisposable();
+                private readonly RefCountDisposable _refCountDisposable;
 
-            private Queue<ISubject<TSource>> _queue;
-            private int _n;
-            private SingleAssignmentDisposable _m;
-            private RefCountDisposable _refCountDisposable;
+                private readonly int _count;
+                private readonly int _skip;
 
-            public IDisposable Run()
-            {
-                _queue = new Queue<ISubject<TSource>>();
-                _n = 0;
-                _m = new SingleAssignmentDisposable();
-                _refCountDisposable = new RefCountDisposable(_m);
+                public _(Count parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _refCountDisposable = new RefCountDisposable(_m);
 
-                var firstWindow = CreateWindow();
-                base._observer.OnNext(firstWindow);
+                    _count = parent._count;
+                    _skip = parent._skip;
+                }
 
-                _m.Disposable = _parent._source.SubscribeSafe(this);
+                private int _n;
 
-                return _refCountDisposable;
-            }
+                public IDisposable Run(IObservable<TSource> source)
+                {
+                    _n = 0;
 
-            private IObservable<TSource> CreateWindow()
-            {
-                var s = new Subject<TSource>();
-                _queue.Enqueue(s);
-                return new WindowObservable<TSource>(s, _refCountDisposable);
-            }
+                    var firstWindow = CreateWindow();
+                    base._observer.OnNext(firstWindow);
 
-            public void OnNext(TSource value)
-            {
-                foreach (var s in _queue)
-                    s.OnNext(value);
+                    _m.Disposable = source.SubscribeSafe(this);
 
-                var c = _n - _parent._count + 1;
-                if (c >= 0 && c % _parent._skip == 0)
+                    return _refCountDisposable;
+                }
+
+                private IObservable<TSource> CreateWindow()
                 {
-                    var s = _queue.Dequeue();
-                    s.OnCompleted();
+                    var s = new Subject<TSource>();
+                    _queue.Enqueue(s);
+                    return new WindowObservable<TSource>(s, _refCountDisposable);
                 }
 
-                _n++;
-                if (_n % _parent._skip == 0)
+                public void OnNext(TSource value)
                 {
-                    var newWindow = CreateWindow();
-                    base._observer.OnNext(newWindow);
+                    foreach (var s in _queue)
+                        s.OnNext(value);
+
+                    var c = _n - _count + 1;
+                    if (c >= 0 && c % _skip == 0)
+                    {
+                        var s = _queue.Dequeue();
+                        s.OnCompleted();
+                    }
+
+                    _n++;
+                    if (_n % _skip == 0)
+                    {
+                        var newWindow = CreateWindow();
+                        base._observer.OnNext(newWindow);
+                    }
                 }
-            }
 
-            public void OnError(Exception error)
-            {
-                while (_queue.Count > 0)
-                    _queue.Dequeue().OnError(error);
+                public void OnError(Exception error)
+                {
+                    while (_queue.Count > 0)
+                        _queue.Dequeue().OnError(error);
 
-                base._observer.OnError(error);
-                base.Dispose();
-            }
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
 
-            public void OnCompleted()
-            {
-                while (_queue.Count > 0)
-                    _queue.Dequeue().OnCompleted();
+                public void OnCompleted()
+                {
+                    while (_queue.Count > 0)
+                        _queue.Dequeue().OnCompleted();
 
-                base._observer.OnCompleted();
-                base.Dispose();
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
 
-        class WindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>
+        internal sealed class TimeSliding : Producer<IObservable<TSource>>
         {
-            private readonly Window<TSource> _parent;
+            private readonly IObservable<TSource> _source;
+            private readonly TimeSpan _timeSpan;
+            private readonly TimeSpan _timeShift;
+            private readonly IScheduler _scheduler;
 
-            public WindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public TimeSliding(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
             {
-                _parent = parent;
+                _source = source;
+                _timeSpan = timeSpan;
+                _timeShift = timeShift;
+                _scheduler = scheduler;
             }
 
-            private TimeSpan _totalTime;
-            private TimeSpan _nextShift;
-            private TimeSpan _nextSpan;
-
-            private object _gate;
-            private Queue<ISubject<TSource>> _q;
-
-            private SerialDisposable _timerD;
-            private RefCountDisposable _refCountDisposable;
+            protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(this, observer, cancel);
+                setSink(sink);
+                return sink.Run(this);
+            }
 
-            public IDisposable Run()
+            private sealed class _ : Sink<IObservable<TSource>>, IObserver<TSource>
             {
-                _totalTime = TimeSpan.Zero;
-                _nextShift = _parent._timeShift;
-                _nextSpan = _parent._timeSpan;
+                private readonly object _gate = new object();
+                private readonly Queue<ISubject<TSource>> _q = new Queue<ISubject<TSource>>();
+                private readonly SerialDisposable _timerD = new SerialDisposable();
 
-                _gate = new object();
-                _q = new Queue<ISubject<TSource>>();
+                private readonly IScheduler _scheduler;
+                private readonly TimeSpan _timeShift;
 
-                _timerD = new SerialDisposable();
+                public _(TimeSliding parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _scheduler = parent._scheduler;
+                    _timeShift = parent._timeShift;
+                }
 
-                var groupDisposable = new CompositeDisposable(2) { _timerD };
-                _refCountDisposable = new RefCountDisposable(groupDisposable);
+                private RefCountDisposable _refCountDisposable;
+                private TimeSpan _totalTime;
+                private TimeSpan _nextShift;
+                private TimeSpan _nextSpan;
 
-                CreateWindow();
-                CreateTimer();
+                public IDisposable Run(TimeSliding parent)
+                {
+                    _totalTime = TimeSpan.Zero;
+                    _nextShift = parent._timeShift;
+                    _nextSpan = parent._timeSpan;
 
-                groupDisposable.Add(_parent._source.SubscribeSafe(this));
+                    var groupDisposable = new CompositeDisposable(2) { _timerD };
+                    _refCountDisposable = new RefCountDisposable(groupDisposable);
 
-                return _refCountDisposable;
-            }
+                    CreateWindow();
+                    CreateTimer();
 
-            private void CreateWindow()
-            {
-                var s = new Subject<TSource>();
-                _q.Enqueue(s);
-                base._observer.OnNext(new WindowObservable<TSource>(s, _refCountDisposable));
-            }
+                    groupDisposable.Add(parent._source.SubscribeSafe(this));
 
-            private void CreateTimer()
-            {
-                var m = new SingleAssignmentDisposable();
-                _timerD.Disposable = m;
+                    return _refCountDisposable;
+                }
 
-                var isSpan = false;
-                var isShift = false;
-                if (_nextSpan == _nextShift)
+                private void CreateWindow()
                 {
-                    isSpan = true;
-                    isShift = true;
+                    var s = new Subject<TSource>();
+                    _q.Enqueue(s);
+                    base._observer.OnNext(new WindowObservable<TSource>(s, _refCountDisposable));
                 }
-                else if (_nextSpan < _nextShift)
-                    isSpan = true;
-                else
-                    isShift = true;
 
-                var newTotalTime = isSpan ? _nextSpan : _nextShift;
-                var ts = newTotalTime - _totalTime;
-                _totalTime = newTotalTime;
+                private void CreateTimer()
+                {
+                    var m = new SingleAssignmentDisposable();
+                    _timerD.Disposable = m;
 
-                if (isSpan)
-                    _nextSpan += _parent._timeShift;
-                if (isShift)
-                    _nextShift += _parent._timeShift;
+                    var isSpan = false;
+                    var isShift = false;
+                    if (_nextSpan == _nextShift)
+                    {
+                        isSpan = true;
+                        isShift = true;
+                    }
+                    else if (_nextSpan < _nextShift)
+                        isSpan = true;
+                    else
+                        isShift = true;
 
-                m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
-            }
+                    var newTotalTime = isSpan ? _nextSpan : _nextShift;
+                    var ts = newTotalTime - _totalTime;
+                    _totalTime = newTotalTime;
 
-            struct State
-            {
-                public bool isSpan;
-                public bool isShift;
-            }
+                    if (isSpan)
+                        _nextSpan += _timeShift;
+                    if (isShift)
+                        _nextShift += _timeShift;
 
-            private IDisposable Tick(IScheduler self, State state)
-            {
-                lock (_gate)
+                    m.Disposable = _scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
+                }
+
+                struct State
                 {
-                    //
-                    // BREAKING CHANGE v2 > v1.x - Making behavior of sending OnCompleted to the window
-                    //                             before sending out a new window consistent across all
-                    //                             overloads of Window and Buffer. Before v2, the two
-                    //                             operations below were reversed.
-                    //
-                    if (state.isSpan)
-                    {
-                        var s = _q.Dequeue();
-                        s.OnCompleted();
-                    }
+                    public bool isSpan;
+                    public bool isShift;
+                }
 
-                    if (state.isShift)
+                private IDisposable Tick(IScheduler self, State state)
+                {
+                    lock (_gate)
                     {
-                        CreateWindow();
+                        //
+                        // BREAKING CHANGE v2 > v1.x - Making behavior of sending OnCompleted to the window
+                        //                             before sending out a new window consistent across all
+                        //                             overloads of Window and Buffer. Before v2, the two
+                        //                             operations below were reversed.
+                        //
+                        if (state.isSpan)
+                        {
+                            var s = _q.Dequeue();
+                            s.OnCompleted();
+                        }
+
+                        if (state.isShift)
+                        {
+                            CreateWindow();
+                        }
                     }
-                }
 
-                CreateTimer();
+                    CreateTimer();
 
-                return Disposable.Empty;
-            }
+                    return Disposable.Empty;
+                }
 
-            public void OnNext(TSource value)
-            {
-                lock (_gate)
+                public void OnNext(TSource value)
                 {
-                    foreach (var s in _q)
-                        s.OnNext(value);
+                    lock (_gate)
+                    {
+                        foreach (var s in _q)
+                            s.OnNext(value);
+                    }
                 }
-            }
 
-            public void OnError(Exception error)
-            {
-                lock (_gate)
+                public void OnError(Exception error)
                 {
-                    foreach (var s in _q)
-                        s.OnError(error);
+                    lock (_gate)
+                    {
+                        foreach (var s in _q)
+                            s.OnError(error);
 
-                    base._observer.OnError(error);
-                    base.Dispose();
+                        base._observer.OnError(error);
+                        base.Dispose();
+                    }
                 }
-            }
 
-            public void OnCompleted()
-            {
-                lock (_gate)
+                public void OnCompleted()
                 {
-                    foreach (var s in _q)
-                        s.OnCompleted();
+                    lock (_gate)
+                    {
+                        foreach (var s in _q)
+                            s.OnCompleted();
 
-                    base._observer.OnCompleted();
-                    base.Dispose();
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
                 }
             }
         }
 
-        class TimeShiftImpl : Sink<IObservable<TSource>>, IObserver<TSource>
+        internal sealed class TimeHopping : Producer<IObservable<TSource>>
         {
-            private readonly Window<TSource> _parent;
+            private readonly IObservable<TSource> _source;
+            private readonly TimeSpan _timeSpan;
+            private readonly IScheduler _scheduler;
 
-            public TimeShiftImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public TimeHopping(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
             {
-                _parent = parent;
+                _source = source;
+                _timeSpan = timeSpan;
+                _scheduler = scheduler;
             }
 
-            private object _gate;
-            private Subject<TSource> _subject;
-            private RefCountDisposable _refCountDisposable;
+            protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(observer, cancel);
+                setSink(sink);
+                return sink.Run(this);
+            }
 
-            public IDisposable Run()
+            private sealed class _ : Sink<IObservable<TSource>>, IObserver<TSource>
             {
-                _gate = new object();
+                private readonly object _gate = new object();
+
+                public _(IObserver<IObservable<TSource>> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                }
 
-                var groupDisposable = new CompositeDisposable(2);
-                _refCountDisposable = new RefCountDisposable(groupDisposable);
+                private Subject<TSource> _subject;
+                private RefCountDisposable _refCountDisposable;
 
-                CreateWindow();
+                public IDisposable Run(TimeHopping parent)
+                {
+                    var groupDisposable = new CompositeDisposable(2);
+                    _refCountDisposable = new RefCountDisposable(groupDisposable);
 
-                groupDisposable.Add(_parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick));
-                groupDisposable.Add(_parent._source.SubscribeSafe(this));
+                    CreateWindow();
 
-                return _refCountDisposable;
-            }
+                    groupDisposable.Add(parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick));
+                    groupDisposable.Add(parent._source.SubscribeSafe(this));
 
-            private void Tick()
-            {
-                lock (_gate)
+                    return _refCountDisposable;
+                }
+
+                private void Tick()
                 {
-                    _subject.OnCompleted();
-                    CreateWindow();
+                    lock (_gate)
+                    {
+                        _subject.OnCompleted();
+                        CreateWindow();
+                    }
                 }
-            }
 
-            private void CreateWindow()
-            {
-                _subject = new Subject<TSource>();
-                base._observer.OnNext(new WindowObservable<TSource>(_subject, _refCountDisposable));
-            }
+                private void CreateWindow()
+                {
+                    _subject = new Subject<TSource>();
+                    base._observer.OnNext(new WindowObservable<TSource>(_subject, _refCountDisposable));
+                }
 
-            public void OnNext(TSource value)
-            {
-                lock (_gate)
+                public void OnNext(TSource value)
                 {
-                    _subject.OnNext(value);
+                    lock (_gate)
+                    {
+                        _subject.OnNext(value);
+                    }
                 }
-            }
 
-            public void OnError(Exception error)
-            {
-                lock (_gate)
+                public void OnError(Exception error)
                 {
-                    _subject.OnError(error);
+                    lock (_gate)
+                    {
+                        _subject.OnError(error);
 
-                    base._observer.OnError(error);
-                    base.Dispose();
+                        base._observer.OnError(error);
+                        base.Dispose();
+                    }
                 }
-            }
 
-            public void OnCompleted()
-            {
-                lock (_gate)
+                public void OnCompleted()
                 {
-                    _subject.OnCompleted();
+                    lock (_gate)
+                    {
+                        _subject.OnCompleted();
 
-                    base._observer.OnCompleted();
-                    base.Dispose();
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
                 }
             }
         }
 
-        class BoundedWindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>
+        internal sealed class Ferry : Producer<IObservable<TSource>>
         {
-            private readonly Window<TSource> _parent;
+            private readonly IObservable<TSource> _source;
+            private readonly int _count;
+            private readonly TimeSpan _timeSpan;
+            private readonly IScheduler _scheduler;
 
-            public BoundedWindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Ferry(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
             {
-                _parent = parent;
+                _source = source;
+                _timeSpan = timeSpan;
+                _count = count;
+                _scheduler = scheduler;
             }
 
-            private object _gate;
-            private ISubject<TSource> _s;
-            private int _n;
-            private int _windowId;
-
-            private SerialDisposable _timerD;
-            private RefCountDisposable _refCountDisposable;
-
-            public IDisposable Run()
+            protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                _gate = new object();
-                _s = default(ISubject<TSource>);
-                _n = 0;
-                _windowId = 0;
-
-                _timerD = new SerialDisposable();
-                var groupDisposable = new CompositeDisposable(2) { _timerD };
-                _refCountDisposable = new RefCountDisposable(groupDisposable);
-
-                _s = new Subject<TSource>();
-                base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
-                CreateTimer(0);
-
-                groupDisposable.Add(_parent._source.SubscribeSafe(this));
-
-                return _refCountDisposable;
+                var sink = new _(this, observer, cancel);
+                setSink(sink);
+                return sink.Run(_source);
             }
 
-            private void CreateTimer(int id)
+            private sealed class _ : Sink<IObservable<TSource>>, IObserver<TSource>
             {
-                var m = new SingleAssignmentDisposable();
-                _timerD.Disposable = m;
-
-                m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
-            }
+                private readonly object _gate = new object();
+                private readonly SerialDisposable _timerD = new SerialDisposable();
 
-            private IDisposable Tick(IScheduler self, int id)
-            {
-                var d = Disposable.Empty;
+                private readonly int _count;
+                private readonly TimeSpan _timeSpan;
+                private readonly IScheduler _scheduler;
 
-                var newId = 0;
-                lock (_gate)
+                public _(Ferry parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    if (id != _windowId)
-                        return d;
+                    _count = parent._count;
+                    _timeSpan = parent._timeSpan;
+                    _scheduler = parent._scheduler;
+                }
+
+                private ISubject<TSource> _s;
+                private int _n;
+                private int _windowId;
+
+                private RefCountDisposable _refCountDisposable;
 
+                public IDisposable Run(IObservable<TSource> source)
+                {
+                    _s = default(ISubject<TSource>);
                     _n = 0;
-                    newId = ++_windowId;
+                    _windowId = 0;
+
+                    var groupDisposable = new CompositeDisposable(2) { _timerD };
+                    _refCountDisposable = new RefCountDisposable(groupDisposable);
 
-                    _s.OnCompleted();
                     _s = new Subject<TSource>();
                     base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
-                }
+                    CreateTimer(0);
 
-                CreateTimer(newId);
+                    groupDisposable.Add(source.SubscribeSafe(this));
 
-                return d;
-            }
+                    return _refCountDisposable;
+                }
 
-            public void OnNext(TSource value)
-            {
-                var newWindow = false;
-                var newId = 0;
+                private void CreateTimer(int id)
+                {
+                    var m = new SingleAssignmentDisposable();
+                    _timerD.Disposable = m;
+
+                    m.Disposable = _scheduler.Schedule(id, _timeSpan, Tick);
+                }
 
-                lock (_gate)
+                private IDisposable Tick(IScheduler self, int id)
                 {
-                    _s.OnNext(value);
+                    var d = Disposable.Empty;
 
-                    _n++;
-                    if (_n == _parent._count)
+                    var newId = 0;
+                    lock (_gate)
                     {
-                        newWindow = true;
+                        if (id != _windowId)
+                            return d;
+
                         _n = 0;
                         newId = ++_windowId;
 
@@ -458,288 +454,317 @@ namespace System.Reactive.Linq.ObservableImpl
                         _s = new Subject<TSource>();
                         base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
                     }
-                }
 
-                if (newWindow)
                     CreateTimer(newId);
-            }
 
-            public void OnError(Exception error)
-            {
-                lock (_gate)
+                    return d;
+                }
+
+                public void OnNext(TSource value)
                 {
-                    _s.OnError(error);
-                    base._observer.OnError(error);
-                    base.Dispose();
+                    var newWindow = false;
+                    var newId = 0;
+
+                    lock (_gate)
+                    {
+                        _s.OnNext(value);
+
+                        _n++;
+                        if (_n == _count)
+                        {
+                            newWindow = true;
+                            _n = 0;
+                            newId = ++_windowId;
+
+                            _s.OnCompleted();
+                            _s = new Subject<TSource>();
+                            base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
+                        }
+                    }
+
+                    if (newWindow)
+                        CreateTimer(newId);
                 }
-            }
 
-            public void OnCompleted()
-            {
-                lock (_gate)
+                public void OnError(Exception error)
                 {
-                    _s.OnCompleted();
-                    base._observer.OnCompleted();
-                    base.Dispose();
+                    lock (_gate)
+                    {
+                        _s.OnError(error);
+                        base._observer.OnError(error);
+                        base.Dispose();
+                    }
+                }
+
+                public void OnCompleted()
+                {
+                    lock (_gate)
+                    {
+                        _s.OnCompleted();
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
                 }
             }
         }
     }
 
-    internal sealed class Window<TSource, TWindowClosing> : Producer<IObservable<TSource>>
+    internal static class Window<TSource, TWindowClosing>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<IObservable<TWindowClosing>> _windowClosingSelector;
-        private readonly IObservable<TWindowClosing> _windowBoundaries;
-
-        public Window(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector)
+        internal sealed class Selector : Producer<IObservable<TSource>>
         {
-            _source = source;
-            _windowClosingSelector = windowClosingSelector;
-        }
+            private readonly IObservable<TSource> _source;
+            private readonly Func<IObservable<TWindowClosing>> _windowClosingSelector;
 
-        public Window(IObservable<TSource> source, IObservable<TWindowClosing> windowBoundaries)
-        {
-            _source = source;
-            _windowBoundaries = windowBoundaries;
-        }
-
-        protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_windowClosingSelector != null)
+            public Selector(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector)
             {
-                var sink = new _(this, observer, cancel);
-                setSink(sink);
-                return sink.Run();
+                _source = source;
+                _windowClosingSelector = windowClosingSelector;
             }
-            else
+
+            protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new Beta(this, observer, cancel);
+                var sink = new _(this, observer, cancel);
                 setSink(sink);
-                return sink.Run();
+                return sink.Run(_source);
             }
-        }
-
-        class _ : Sink<IObservable<TSource>>, IObserver<TSource>
-        {
-            private readonly Window<TSource, TWindowClosing> _parent;
 
-            public _(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<IObservable<TSource>>, IObserver<TSource>
             {
-                _parent = parent;
-            }
+                private readonly object _gate = new object();
+                private readonly AsyncLock _windowGate = new AsyncLock();
+                private readonly SerialDisposable _m = new SerialDisposable();
 
-            private ISubject<TSource> _window;
-            private object _gate;
-            private AsyncLock _windowGate;
+                private readonly Func<IObservable<TWindowClosing>> _windowClosingSelector;
 
-            private SerialDisposable _m;
-            private RefCountDisposable _refCountDisposable;
+                public _(Selector parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _windowClosingSelector = parent._windowClosingSelector;
+                }
 
-            public IDisposable Run()
-            {
-                _window = new Subject<TSource>();
-                _gate = new object();
-                _windowGate = new AsyncLock();
+                private ISubject<TSource> _window;
+                private RefCountDisposable _refCountDisposable;
 
-                _m = new SerialDisposable();
-                var groupDisposable = new CompositeDisposable(2) { _m };
-                _refCountDisposable = new RefCountDisposable(groupDisposable);
+                public IDisposable Run(IObservable<TSource> source)
+                {
+                    _window = new Subject<TSource>();
 
-                var window = new WindowObservable<TSource>(_window, _refCountDisposable);
-                base._observer.OnNext(window);
+                    var groupDisposable = new CompositeDisposable(2) { _m };
+                    _refCountDisposable = new RefCountDisposable(groupDisposable);
 
-                groupDisposable.Add(_parent._source.SubscribeSafe(this));
+                    var window = new WindowObservable<TSource>(_window, _refCountDisposable);
+                    base._observer.OnNext(window);
 
-                _windowGate.Wait(CreateWindowClose);
+                    groupDisposable.Add(source.SubscribeSafe(this));
 
-                return _refCountDisposable;
-            }
+                    _windowGate.Wait(CreateWindowClose);
 
-            private void CreateWindowClose()
-            {
-                var windowClose = default(IObservable<TWindowClosing>);
-                try
-                {
-                    windowClose = _parent._windowClosingSelector();
+                    return _refCountDisposable;
                 }
-                catch (Exception exception)
+
+                private void CreateWindowClose()
                 {
-                    lock (_gate)
+                    var windowClose = default(IObservable<TWindowClosing>);
+                    try
                     {
-                        base._observer.OnError(exception);
-                        base.Dispose();
+                        windowClose = _windowClosingSelector();
                     }
-                    return;
+                    catch (Exception exception)
+                    {
+                        lock (_gate)
+                        {
+                            base._observer.OnError(exception);
+                            base.Dispose();
+                        }
+                        return;
+                    }
+
+                    var closingSubscription = new SingleAssignmentDisposable();
+                    _m.Disposable = closingSubscription;
+                    closingSubscription.Disposable = windowClose.SubscribeSafe(new WindowClosingObserver(this, closingSubscription));
                 }
 
-                var closingSubscription = new SingleAssignmentDisposable();
-                _m.Disposable = closingSubscription;
-                closingSubscription.Disposable = windowClose.SubscribeSafe(new Omega(this, closingSubscription));
-            }
+                private void CloseWindow(IDisposable closingSubscription)
+                {
+                    closingSubscription.Dispose();
 
-            private void CloseWindow(IDisposable closingSubscription)
-            {
-                closingSubscription.Dispose();
+                    lock (_gate)
+                    {
+                        _window.OnCompleted();
+                        _window = new Subject<TSource>();
 
-                lock (_gate)
-                {
-                    _window.OnCompleted();
-                    _window = new Subject<TSource>();
+                        var window = new WindowObservable<TSource>(_window, _refCountDisposable);
+                        base._observer.OnNext(window);
+                    }
 
-                    var window = new WindowObservable<TSource>(_window, _refCountDisposable);
-                    base._observer.OnNext(window);
+                    _windowGate.Wait(CreateWindowClose);
                 }
 
-                _windowGate.Wait(CreateWindowClose);
-            }
+                private sealed class WindowClosingObserver : IObserver<TWindowClosing>
+                {
+                    private readonly _ _parent;
+                    private readonly IDisposable _self;
 
-            class Omega : IObserver<TWindowClosing>
-            {
-                private readonly _ _parent;
-                private readonly IDisposable _self;
+                    public WindowClosingObserver(_ parent, IDisposable self)
+                    {
+                        _parent = parent;
+                        _self = self;
+                    }
 
-                public Omega(_ parent, IDisposable self)
-                {
-                    _parent = parent;
-                    _self = self;
+                    public void OnNext(TWindowClosing value)
+                    {
+                        _parent.CloseWindow(_self);
+                    }
+
+                    public void OnError(Exception error)
+                    {
+                        _parent.OnError(error);
+                    }
+
+                    public void OnCompleted()
+                    {
+                        _parent.CloseWindow(_self);
+                    }
                 }
 
-                public void OnNext(TWindowClosing value)
+                public void OnNext(TSource value)
                 {
-                    _parent.CloseWindow(_self);
+                    lock (_gate)
+                    {
+                        _window.OnNext(value);
+                    }
                 }
 
                 public void OnError(Exception error)
                 {
-                    _parent.OnError(error);
+                    lock (_gate)
+                    {
+                        _window.OnError(error);
+                        base._observer.OnError(error);
+                        base.Dispose();
+                    }
                 }
 
                 public void OnCompleted()
                 {
-                    _parent.CloseWindow(_self);
+                    lock (_gate)
+                    {
+                        _window.OnCompleted();
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
                 }
             }
+        }
 
-            public void OnNext(TSource value)
-            {
-                lock (_gate)
-                {
-                    _window.OnNext(value);
-                }
-            }
+        internal sealed class Boundaries : Producer<IObservable<TSource>>
+        {
+            private readonly IObservable<TSource> _source;
+            private readonly IObservable<TWindowClosing> _windowBoundaries;
 
-            public void OnError(Exception error)
+            public Boundaries(IObservable<TSource> source, IObservable<TWindowClosing> windowBoundaries)
             {
-                lock (_gate)
-                {
-                    _window.OnError(error);
-                    base._observer.OnError(error);
-                    base.Dispose();
-                }
+                _source = source;
+                _windowBoundaries = windowBoundaries;
             }
 
-            public void OnCompleted()
+            protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                lock (_gate)
-                {
-                    _window.OnCompleted();
-                    base._observer.OnCompleted();
-                    base.Dispose();
-                }
+                var sink = new _(this, observer, cancel);
+                setSink(sink);
+                return sink.Run(this);
             }
-        }
-
-        class Beta : Sink<IObservable<TSource>>, IObserver<TSource>
-        {
-            private readonly Window<TSource, TWindowClosing> _parent;
 
-            public Beta(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<IObservable<TSource>>, IObserver<TSource>
             {
-                _parent = parent;
-            }
-
-            private ISubject<TSource> _window;
-            private object _gate;
+                private readonly object _gate = new object();
 
-            private RefCountDisposable _refCountDisposable;            
+                private readonly IObservable<TWindowClosing> _windowBoundaries;
 
-            public IDisposable Run()
-            {
-                _window = new Subject<TSource>();
-                _gate = new object();
+                public _(Boundaries parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _windowBoundaries = parent._windowBoundaries;
+                }
 
-                var d = new CompositeDisposable(2);
-                _refCountDisposable = new RefCountDisposable(d);
+                private ISubject<TSource> _window;
+                private RefCountDisposable _refCountDisposable;
 
-                var window = new WindowObservable<TSource>(_window, _refCountDisposable);
-                base._observer.OnNext(window);
+                public IDisposable Run(Boundaries parent)
+                {
+                    _window = new Subject<TSource>();
 
-                d.Add(_parent._source.SubscribeSafe(this));
-                d.Add(_parent._windowBoundaries.SubscribeSafe(new Omega(this)));
+                    var d = new CompositeDisposable(2);
+                    _refCountDisposable = new RefCountDisposable(d);
 
-                return _refCountDisposable;
-            }
+                    var window = new WindowObservable<TSource>(_window, _refCountDisposable);
+                    base._observer.OnNext(window);
 
-            class Omega : IObserver<TWindowClosing>
-            {
-                private readonly Beta _parent;
+                    d.Add(parent._source.SubscribeSafe(this));
+                    d.Add(parent._windowBoundaries.SubscribeSafe(new WindowClosingObserver(this)));
 
-                public Omega(Beta parent)
-                {
-                    _parent = parent;
+                    return _refCountDisposable;
                 }
 
-                public void OnNext(TWindowClosing value)
+                private sealed class WindowClosingObserver : IObserver<TWindowClosing>
                 {
-                    lock (_parent._gate)
+                    private readonly _ _parent;
+
+                    public WindowClosingObserver(_ parent)
                     {
-                        _parent._window.OnCompleted();
-                        _parent._window = new Subject<TSource>();
+                        _parent = parent;
+                    }
 
-                        var window = new WindowObservable<TSource>(_parent._window, _parent._refCountDisposable);
-                        _parent._observer.OnNext(window);
+                    public void OnNext(TWindowClosing value)
+                    {
+                        lock (_parent._gate)
+                        {
+                            _parent._window.OnCompleted();
+                            _parent._window = new Subject<TSource>();
+
+                            var window = new WindowObservable<TSource>(_parent._window, _parent._refCountDisposable);
+                            _parent._observer.OnNext(window);
+                        }
                     }
-                }
 
-                public void OnError(Exception error)
-                {
-                    _parent.OnError(error);
-                }
+                    public void OnError(Exception error)
+                    {
+                        _parent.OnError(error);
+                    }
 
-                public void OnCompleted()
-                {
-                    _parent.OnCompleted();
+                    public void OnCompleted()
+                    {
+                        _parent.OnCompleted();
+                    }
                 }
-            }
 
-            public void OnNext(TSource value)
-            {
-                lock (_gate)
+                public void OnNext(TSource value)
                 {
-                    _window.OnNext(value);
+                    lock (_gate)
+                    {
+                        _window.OnNext(value);
+                    }
                 }
-            }
 
-            public void OnError(Exception error)
-            {
-                lock (_gate)
+                public void OnError(Exception error)
                 {
-                    _window.OnError(error);
-                    base._observer.OnError(error);
-                    base.Dispose();
+                    lock (_gate)
+                    {
+                        _window.OnError(error);
+                        base._observer.OnError(error);
+                        base.Dispose();
+                    }
                 }
-            }
 
-            public void OnCompleted()
-            {
-                lock (_gate)
+                public void OnCompleted()
                 {
-                    _window.OnCompleted();
-                    base._observer.OnCompleted();
-                    base.Dispose();
+                    lock (_gate)
+                    {
+                        _window.OnCompleted();
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
                 }
             }
         }

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs

@@ -369,7 +369,7 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector)
         {
-            return new Window<TSource, TWindowClosing>(source, windowClosingSelector);
+            return new Window<TSource, TWindowClosing>.Selector(source, windowClosingSelector);
         }
 
         public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector)
@@ -379,7 +379,7 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries)
         {
-            return new Window<TSource, TWindowBoundary>(source, windowBoundaries);
+            return new Window<TSource, TWindowBoundary>.Boundaries(source, windowBoundaries);
         }
 
         #endregion

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs

@@ -294,7 +294,7 @@ namespace System.Reactive.Linq
 
         private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, int count, int skip)
         {
-            return new Window<TSource>(source, count, skip);
+            return new Window<TSource>.Count(source, count, skip);
         }
 
         #endregion

+ 9 - 4
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Time.cs

@@ -615,12 +615,17 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan)
         {
-            return Window_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations);
+            return Window_<TSource>(source, timeSpan, SchedulerDefaults.TimeBasedOperations);
         }
 
         public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
         {
-            return Window_<TSource>(source, timeSpan, timeSpan, scheduler);
+            return Window_<TSource>(source, timeSpan, scheduler);
+        }
+
+        private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
+        {
+            return new Window<TSource>.TimeHopping(source, timeSpan, scheduler);
         }
 
         public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
@@ -635,7 +640,7 @@ namespace System.Reactive.Linq
 
         private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
         {
-            return new Window<TSource>(source, timeSpan, timeShift, scheduler);
+            return new Window<TSource>.TimeSliding(source, timeSpan, timeShift, scheduler);
         }
 
         #endregion
@@ -654,7 +659,7 @@ namespace System.Reactive.Linq
 
         private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
         {
-            return new Window<TSource>(source, timeSpan, count, scheduler);
+            return new Window<TSource>.Ferry(source, timeSpan, count, scheduler);
         }
 
         #endregion