Browse Source

Adding CreateSink.

Bart De Smet 8 years ago
parent
commit
b12feed548
99 changed files with 465 additions and 3 deletions
  1. 4 0
      Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs
  2. 2 0
      Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs
  3. 2 0
      Rx.NET/Source/src/System.Reactive/Internal/Producer.cs
  4. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/AddRef.cs
  5. 6 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Aggregate.cs
  6. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/All.cs
  7. 3 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs
  8. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Any.cs
  9. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/AsObservable.cs
  10. 20 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Average.cs
  11. 12 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs
  12. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Case.cs
  13. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Cast.cs
  14. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs
  15. 29 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.Generated.cs
  16. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.Generated.tt
  17. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs
  18. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Concat.cs
  19. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Contains.cs
  20. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Count.cs
  21. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/DefaultIfEmpty.cs
  22. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Defer.cs
  23. 6 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs
  24. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/DelaySubscription.cs
  25. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Dematerialize.cs
  26. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Distinct.cs
  27. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/DistinctUntilChanged.cs
  28. 6 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Do.cs
  29. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/DoWhile.cs
  30. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ElementAt.cs
  31. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ElementAtOrDefault.cs
  32. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Empty.cs
  33. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Finally.cs
  34. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstAsync.cs
  35. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstOrDefaultAsync.cs
  36. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/For.cs
  37. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEvent.cs
  38. 6 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs
  39. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupBy.cs
  40. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs
  41. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupJoin.cs
  42. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/If.cs
  43. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/IgnoreElements.cs
  44. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/IsEmpty.cs
  45. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Join.cs
  46. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/LastAsync.cs
  47. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/LastOrDefaultAsync.cs
  48. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/LongCount.cs
  49. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Materialize.cs
  50. 22 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Max.cs
  51. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/MaxBy.cs
  52. 6 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs
  53. 22 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Min.cs
  54. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/MinBy.cs
  55. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Multicast.cs
  56. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/OfType.cs
  57. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/OnErrorResumeNext.cs
  58. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs
  59. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs
  60. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Repeat.cs
  61. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Return.cs
  62. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Sample.cs
  63. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Scan.cs
  64. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Select.cs
  65. 26 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs
  66. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SequenceEqual.cs
  67. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleAsync.cs
  68. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleOrDefaultAsync.cs
  69. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs
  70. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipLast.cs
  71. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs
  72. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipWhile.cs
  73. 20 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Sum.cs
  74. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Switch.cs
  75. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Synchronize.cs
  76. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs
  77. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs
  78. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLastBuffer.cs
  79. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs
  80. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeWhile.cs
  81. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs
  82. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Throw.cs
  83. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TimeInterval.cs
  84. 6 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs
  85. 8 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs
  86. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Timestamp.cs
  87. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ToArray.cs
  88. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ToDictionary.cs
  89. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ToList.cs
  90. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ToLookup.cs
  91. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ToObservable.cs
  92. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Using.cs
  93. 4 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Where.cs
  94. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/While.cs
  95. 12 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs
  96. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/WithLatestFrom.cs
  97. 29 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.Generated.cs
  98. 2 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.Generated.tt
  99. 6 0
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs

@@ -20,6 +20,8 @@ namespace System.Reactive.Concurrency
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new ObserveOnObserver<TSource>(_scheduler, observer, cancel);
+
             [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
@@ -40,6 +42,8 @@ namespace System.Reactive.Concurrency
                 _context = context;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_context, observer, cancel);
+
             [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs

@@ -20,6 +20,8 @@ namespace System.Reactive.Concurrency
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Internal/Producer.cs

@@ -92,5 +92,7 @@ namespace System.Reactive
         /// <returns>Disposable representing all the resources and/or subscriptions the operator uses to process events.</returns>
         /// <remarks>The <paramref name="observer">observer</paramref> passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar.</remarks>
         protected abstract IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink);
+
+        protected abstract IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel);
     }
 }

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/AddRef.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _refCount = refCount;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, StableCompositeDisposable.Create(_refCount.GetDisposable(), cancel));
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var d = StableCompositeDisposable.Create(_refCount.GetDisposable(), cancel);

+ 6 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Aggregate.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _accumulator = accumulator;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_accumulator, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_accumulator, observer, cancel);
@@ -93,6 +95,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _accumulator = accumulator;
         }
 
+        protected override IDisposable CreateSink(IObserver<TAccumulate> observer, IDisposable cancel) => new _(_seed, _accumulator, observer, cancel);
+
         protected override IDisposable Run(IObserver<TAccumulate> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_seed, _accumulator, observer, cancel);
@@ -155,6 +159,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/All.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _predicate = predicate;
         }
 
+        protected override IDisposable CreateSink(IObserver<bool> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
         protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_predicate, observer, cancel);

+ 3 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs

@@ -17,9 +17,11 @@ namespace System.Reactive.Linq.ObservableImpl
             _right = right;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
-            var sink = new _( observer, cancel);
+            var sink = new _(observer, cancel);
             setSink(sink);
             return sink.Run(this);
         }

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Any.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _source = source;
             }
 
+            protected override IDisposable CreateSink(IObserver<bool> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -62,6 +64,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<bool> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/AsObservable.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
         public IObservable<TSource> Eval() => _source;
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 20 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Average.cs

@@ -13,6 +13,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -81,6 +83,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<float> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -149,6 +153,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<decimal> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -217,6 +223,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -285,6 +293,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -353,6 +363,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -424,6 +436,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<float?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -495,6 +509,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<decimal?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -566,6 +582,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -637,6 +655,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 12 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs

@@ -23,6 +23,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _skip = skip;
             }
 
+            protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -118,6 +120,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -276,6 +280,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _( observer, cancel);
@@ -359,6 +365,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -489,6 +497,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _bufferClosingSelector = bufferClosingSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -627,6 +637,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _bufferBoundaries = bufferBoundaries;
             }
 
+            protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Case.cs

@@ -29,6 +29,8 @@ namespace System.Reactive.Linq.ObservableImpl
             return _defaultSource;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Cast.cs

@@ -13,6 +13,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs

@@ -16,6 +16,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _sources = sources;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -91,6 +93,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _handler = handler;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_handler, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_handler, observer, cancel);

+ 29 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.Generated.cs

@@ -10,7 +10,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
     /* The following code is generated by a T4 template. */
 
-    #region CombineLatest auto-generated code (4/17/2017 3:25:27 PM)
+    #region CombineLatest auto-generated code (4/20/2017 2:55:11 PM)
 
     internal sealed class CombineLatest<T1, T2, T3, TResult> : Producer<TResult>
     {
@@ -27,6 +27,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -86,6 +88,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -150,6 +154,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -219,6 +225,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -293,6 +301,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -372,6 +382,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -456,6 +468,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -545,6 +559,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -639,6 +655,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -738,6 +756,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -842,6 +862,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -951,6 +973,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -1065,6 +1089,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -1184,6 +1210,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.Generated.tt

@@ -51,6 +51,8 @@ for (var j = 1; j <= i; j++)
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs

@@ -24,6 +24,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -403,6 +405,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Concat.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _sources = sources;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Contains.cs

@@ -19,6 +19,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<bool> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Count.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _source = source;
             }
 
+            protected override IDisposable CreateSink(IObserver<int> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -74,6 +76,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<int> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/DefaultIfEmpty.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _defaultValue = defaultValue;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_defaultValue, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_defaultValue, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Defer.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _observableFactory = observableFactory;
         }
 
+        protected override IDisposable CreateSink(IObserver<TValue> observer, IDisposable cancel) => new _(_observableFactory, observer, cancel);
+
         protected override IDisposable Run(IObserver<TValue> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_observableFactory, observer, cancel);

+ 6 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -454,6 +454,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _dueTime = dueTime;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => _scheduler.AsLongRunning() != null ? (IDisposable)new L(this, observer, cancel) : new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 if (_scheduler.AsLongRunning() != null)
@@ -563,6 +565,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _dueTime = dueTime;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => _scheduler.AsLongRunning() != null ? (IDisposable)new L(this, observer, cancel) : new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 if (_scheduler.AsLongRunning() != null)
@@ -623,6 +627,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _delaySelector = delaySelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _<Selector>(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _<Selector>(this, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/DelaySubscription.cs

@@ -27,6 +27,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _dueTime = dueTime;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -45,6 +47,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _dueTime = dueTime;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Dematerialize.cs

@@ -13,6 +13,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Distinct.cs

@@ -19,6 +19,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/DistinctUntilChanged.cs

@@ -19,6 +19,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 6 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Do.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _onNext = onNext;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_onNext, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_onNext, observer, cancel);
@@ -75,6 +77,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer = observer;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_observer, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_observer, observer, cancel);
@@ -159,6 +163,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _onCompleted = onCompleted;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/DoWhile.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/ElementAt.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _index = index;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_index, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_index, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/ElementAtOrDefault.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _index = index;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_index, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_index, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Empty.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _scheduler = scheduler;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Finally.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _finallyAction = finallyAction;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_finallyAction, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_finallyAction, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstAsync.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _source = source;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -61,6 +63,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstOrDefaultAsync.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _source = source;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -62,6 +64,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/For.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEvent.cs

@@ -206,6 +206,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
         private Session _session;
 
+        protected override IDisposable CreateSink(IObserver<TArgs> observer, IDisposable cancel) => Disposable.Empty;
+
         protected override IDisposable Run(IObserver<TArgs> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var connection = default(IDisposable);

+ 6 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs

@@ -26,6 +26,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -175,6 +177,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -273,6 +277,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupBy.cs

@@ -25,6 +25,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs

@@ -29,6 +29,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupJoin.cs

@@ -25,6 +25,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/If.cs

@@ -21,6 +21,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
         public IObservable<TResult> Eval() => _condition() ? _thenSource : _elseSource;
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/IgnoreElements.cs

@@ -13,6 +13,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/IsEmpty.cs

@@ -13,6 +13,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<bool> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Join.cs

@@ -24,6 +24,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/LastAsync.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _source = source;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -74,6 +76,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/LastOrDefaultAsync.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _source = source;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -63,6 +65,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/LongCount.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _source = source;
             }
 
+            protected override IDisposable CreateSink(IObserver<long> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -74,6 +76,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<long> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Materialize.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
         public IObservable<TSource> Dematerialize() => _source.AsObservable();
 
+        protected override IDisposable CreateSink(IObserver<Notification<TSource>> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<Notification<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 22 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Max.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => default(TSource) == null ? (IDisposable)new Null(_comparer, observer, cancel) : new NonNull(_comparer, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             // LINQ to Objects makes this distinction in order to make [Max|Max] of an empty collection of reference type objects equal to null.
@@ -168,6 +170,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -235,6 +239,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<float> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -302,6 +308,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<decimal> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -369,6 +377,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<int> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -436,6 +446,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<long> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -503,6 +515,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -562,6 +576,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<float?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -621,6 +637,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<decimal?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -680,6 +698,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<int?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -739,6 +759,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<long?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/MaxBy.cs

@@ -19,6 +19,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 6 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs

@@ -22,6 +22,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _maxConcurrent = maxConcurrent;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_maxConcurrent, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_maxConcurrent, observer, cancel);
@@ -168,6 +170,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _sources = sources;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -298,6 +302,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _sources = sources;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);

+ 22 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Min.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => default(TSource) == null ? (IDisposable)new Null(_comparer, observer, cancel) : new NonNull(_comparer, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             // LINQ to Objects makes this distinction in order to make [Min|Max] of an empty collection of reference type objects equal to null.
@@ -168,6 +170,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -235,6 +239,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<float> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -302,6 +308,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<decimal> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -369,6 +377,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<int> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -436,6 +446,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<long> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -503,6 +515,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -562,6 +576,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<float?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -621,6 +637,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<decimal?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -680,6 +698,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<int?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -739,6 +759,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<long?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/MinBy.cs

@@ -19,6 +19,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Multicast.cs

@@ -20,6 +20,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _selector = selector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/OfType.cs

@@ -13,6 +13,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/OnErrorResumeNext.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _sources = sources;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs

@@ -20,6 +20,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _scheduler = scheduler;
         }
 
+        protected override IDisposable CreateSink(IObserver<int> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -23,6 +23,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _connectableSubscription = default(IDisposable);
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Repeat.cs

@@ -20,6 +20,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_value, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_value, observer, cancel);
@@ -80,6 +82,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _repeatCount = repeatCount;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_value, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_value, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Return.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _scheduler = scheduler;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_value, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_value, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Sample.cs

@@ -18,6 +18,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _sampler = sampler;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -149,6 +151,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _scheduler = scheduler;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Scan.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _accumulator = accumulator;
         }
 
+        protected override IDisposable CreateSink(IObserver<TAccumulate> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TAccumulate> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);
@@ -77,6 +79,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _accumulator = accumulator;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_accumulator, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_accumulator, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Select.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _selector = selector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_selector, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_selector, observer, cancel);
@@ -76,6 +78,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _selector = selector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_selector, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_selector, observer, cancel);

+ 26 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs

@@ -24,6 +24,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -196,6 +198,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -374,6 +378,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -478,6 +484,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -587,6 +595,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -743,6 +753,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -903,6 +915,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _selector = selector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -1064,6 +1078,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _selectorOnCompleted = selectorOnCompleted;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -1152,6 +1168,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _selector = selector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -1408,6 +1426,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _selector = selector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -1508,6 +1528,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _selector = selector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -1610,6 +1632,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _selector = selector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -1738,6 +1762,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _selector = selector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/SequenceEqual.cs

@@ -22,6 +22,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _comparer = comparer;
             }
 
+            protected override IDisposable CreateSink(IObserver<bool> observer, IDisposable cancel) => new _(_comparer, observer, cancel);
+
             protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_comparer, observer, cancel);
@@ -231,6 +233,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _comparer = comparer;
             }
 
+            protected override IDisposable CreateSink(IObserver<bool> observer, IDisposable cancel) => new _(_comparer, observer, cancel);
+
             protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_comparer, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleAsync.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _source = source;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -81,6 +83,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleOrDefaultAsync.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _source = source;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -73,6 +75,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs

@@ -32,6 +32,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 return new Count(_source, _count + count);
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_count, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_count, observer, cancel);
@@ -101,6 +103,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     return new Time(_source, duration, _scheduler);
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipLast.cs

@@ -20,6 +20,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _count = count;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_count, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_count, observer, cancel);
@@ -73,6 +75,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_duration, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_duration, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

@@ -18,6 +18,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _other = other;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -153,6 +155,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 return new SkipUntil<TSource>(_source, startTime, _scheduler);
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipWhile.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);
@@ -88,6 +90,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 20 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Sum.cs

@@ -13,6 +13,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -59,6 +61,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<float> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -105,6 +109,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<decimal> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -151,6 +157,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<int> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -208,6 +216,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<long> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -265,6 +275,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<double?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -312,6 +324,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<float?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -359,6 +373,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<decimal?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -406,6 +422,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<int?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -464,6 +482,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<long?> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Switch.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _sources = sources;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Synchronize.cs

@@ -20,6 +20,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_gate, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_gate, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs

@@ -35,6 +35,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     return new Count(_source, count);
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_count, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_count, observer, cancel);
@@ -111,6 +113,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     return new Time(_source, duration, _scheduler);
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs

@@ -23,6 +23,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _loopScheduler = loopScheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -132,6 +134,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _loopScheduler = loopScheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLastBuffer.cs

@@ -20,6 +20,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _count = count;
             }
 
+            protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(_count, observer, cancel);
+
             protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_count, observer, cancel);
@@ -78,6 +80,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(_duration, observer, cancel);
+
             protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_duration, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

@@ -18,6 +18,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _other = other;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);
@@ -186,6 +188,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 return new TakeUntil<TSource>(_source, endTime, _scheduler);
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeWhile.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);
@@ -88,6 +90,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs

@@ -20,6 +20,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _scheduler = scheduler;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);
@@ -129,6 +131,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _throttleSelector = throttleSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throw.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _scheduler = scheduler;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_exception, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_exception, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/TimeInterval.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _scheduler = scheduler;
         }
 
+        protected override IDisposable CreateSink(IObserver<System.Reactive.TimeInterval<TSource>> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<System.Reactive.TimeInterval<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 6 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

@@ -24,6 +24,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -166,6 +168,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_other, observer, cancel);
@@ -278,6 +282,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _other = other;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 8 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs

@@ -29,6 +29,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     _dueTime = dueTime;
                 }
 
+                protected override IDisposable CreateSink(IObserver<long> observer, IDisposable cancel) => new _(observer, cancel);
+
                 protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
                 {
                     var sink = new _(observer, cancel);
@@ -47,6 +49,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     _dueTime = dueTime;
                 }
 
+                protected override IDisposable CreateSink(IObserver<long> observer, IDisposable cancel) => new _(observer, cancel);
+
                 protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
                 {
                     var sink = new _(observer, cancel);
@@ -102,6 +106,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     _dueTime = dueTime;
                 }
 
+                protected override IDisposable CreateSink(IObserver<long> observer, IDisposable cancel) => new _(_period, observer, cancel);
+
                 protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
                 {
                     var sink = new _(_period, observer, cancel);
@@ -120,6 +126,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     _dueTime = dueTime;
                 }
 
+                protected override IDisposable CreateSink(IObserver<long> observer, IDisposable cancel) => new _(_period, observer, cancel);
+
                 protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
                 {
                     var sink = new _(_period, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timestamp.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _scheduler = scheduler;
         }
 
+        protected override IDisposable CreateSink(IObserver<Timestamped<TSource>> observer, IDisposable cancel) => new _(_scheduler, observer, cancel);
+
         protected override IDisposable Run(IObserver<Timestamped<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_scheduler, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToArray.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource[]> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource[]> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToDictionary.cs

@@ -21,6 +21,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<IDictionary<TKey, TElement>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<IDictionary<TKey, TElement>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToList.cs

@@ -15,6 +15,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToLookup.cs

@@ -22,6 +22,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
+        protected override IDisposable CreateSink(IObserver<ILookup<TKey, TElement>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<ILookup<TKey, TElement>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToObservable.cs

@@ -19,6 +19,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _scheduler = scheduler;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Using.cs

@@ -18,6 +18,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _observableFactory = observableFactory;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Where.cs

@@ -22,6 +22,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 return new Predicate(_source, x => _predicate(x) && predicate(x));
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);
@@ -84,6 +86,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _predicate = predicate;
             }
 
+            protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(_predicate, observer, cancel);
+
             protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_predicate, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/While.cs

@@ -17,6 +17,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _source = source;
         }
 
+        protected override IDisposable CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(observer, cancel);

+ 12 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs

@@ -24,6 +24,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _skip = skip;
             }
 
+            protected override IDisposable CreateSink(IObserver<IObservable<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -125,6 +127,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<IObservable<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -287,6 +291,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<IObservable<TSource>> observer, IDisposable cancel) => new _(observer, cancel);
+
             protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
@@ -381,6 +387,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = scheduler;
             }
 
+            protected override IDisposable CreateSink(IObserver<IObservable<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -517,6 +525,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _windowClosingSelector = windowClosingSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<IObservable<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);
@@ -664,6 +674,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _windowBoundaries = windowBoundaries;
             }
 
+            protected override IDisposable CreateSink(IObserver<IObservable<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
             protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(this, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/WithLatestFrom.cs

@@ -19,6 +19,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);

+ 29 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.Generated.cs

@@ -10,7 +10,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
     /* The following code is generated by a T4 template. */
 
-    #region Zip auto-generated code (4/17/2017 4:46:17 PM)
+    #region Zip auto-generated code (4/20/2017 2:55:11 PM)
 
     internal sealed class Zip<T1, T2, T3, TResult> : Producer<TResult>
     {
@@ -27,6 +27,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -102,6 +104,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -187,6 +191,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -282,6 +288,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -387,6 +395,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -502,6 +512,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -627,6 +639,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -762,6 +776,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -907,6 +923,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -1062,6 +1080,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -1227,6 +1247,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -1402,6 +1424,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -1587,6 +1611,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);
@@ -1782,6 +1808,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.Generated.tt

@@ -51,6 +51,8 @@ for (var j = 1; j <= i; j++)
             _resultSelector = resultSelector;
         }
 
+        protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(_resultSelector, observer, cancel);

+ 6 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs

@@ -26,6 +26,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_resultSelector, observer, cancel);
@@ -257,6 +259,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
+            protected override IDisposable CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(_resultSelector, observer, cancel);
+
             protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(_resultSelector, observer, cancel);
@@ -539,6 +543,8 @@ namespace System.Reactive.Linq.ObservableImpl
             _sources = sources;
         }
 
+        protected override IDisposable CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) => new _(this, observer, cancel);
+
         protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);