Quellcode durchsuchen

Adding WithLatestFrom operator.

Bart De Smet vor 10 Jahren
Ursprung
Commit
d7b4d31181

+ 1 - 1
Rx.NET/Source/System.Reactive.Experimental/Reactive/Linq/QbservableEx.Generated.cs

@@ -1,5 +1,5 @@
 /*
- * WARNING: Auto-generated file (4/16/2015 23:47:35)
+ * WARNING: Auto-generated file (5/1/2015 21:21:20)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 

+ 1 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs

@@ -605,6 +605,7 @@ namespace System.Reactive.Linq
         IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector);
         IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector);
         IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries);
+        IObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector);
         IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector);
         IObservable<TResult> Zip<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector);
         IObservable<IList<TSource>> Zip<TSource>(IEnumerable<IObservable<TSource>> sources);

+ 27 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Multiple.cs

@@ -1532,6 +1532,33 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + WithLatestFrom +
+
+        /// <summary>
+        /// Merges two observable sequences into one observable sequence by combining each element from the first source with the latest element from the second source, if any.
+        /// </summary>
+        /// <typeparam name="TFirst">The type of the elements in the first source sequence.</typeparam>
+        /// <typeparam name="TSecond">The type of the elements in the second source sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the result sequence, returned by the selector function.</typeparam>
+        /// <param name="first">First observable source.</param>
+        /// <param name="second">Second observable source.</param>
+        /// <param name="resultSelector">Function to invoke for each element from the first source combined with the latest element from the second source, if any.</param>
+        /// <returns>An observable sequence containing the result of combining each element of the first source with the latest element from the second source, if any, using the specified result selector function.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="first"/> or <paramref name="second"/> or <paramref name="resultSelector"/> is null.</exception>
+        public static IObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
+        {
+            if (first == null)
+                throw new ArgumentNullException("first");
+            if (second == null)
+                throw new ArgumentNullException("second");
+            if (resultSelector == null)
+                throw new ArgumentNullException("resultSelector");
+
+            return s_impl.WithLatestFrom<TFirst, TSecond, TResult>(first, second, resultSelector);
+        }
+
+        #endregion
+
         #region + Zip +
 
         /// <summary>

+ 147 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/WithLatestFrom.cs

@@ -0,0 +1,147 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_PERF
+using System;
+using System.Reactive.Disposables;
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    class WithLatestFrom<TFirst, TSecond, TResult> : Producer<TResult>
+    {
+        private readonly IObservable<TFirst> _first;
+        private readonly IObservable<TSecond> _second;
+        private readonly Func<TFirst, TSecond, TResult> _resultSelector;
+
+        public WithLatestFrom(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
+        {
+            _first = first;
+            _second = second;
+            _resultSelector = resultSelector;
+        }
+
+        protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
+        {
+            var sink = new _(this, observer, cancel);
+            setSink(sink);
+            return sink.Run();
+        }
+
+        class _ : Sink<TResult>
+        {
+            private readonly WithLatestFrom<TFirst, TSecond, TResult> _parent;
+
+            public _(WithLatestFrom<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+                : base(observer, cancel)
+            {
+                _parent = parent;
+            }
+
+            private object _gate;
+            private bool _hasLatest;
+            private TSecond _latest;
+
+            public IDisposable Run()
+            {
+                _gate = new object();
+
+                var sndSubscription = new SingleAssignmentDisposable();
+
+                var fstO = new F(this);
+                var sndO = new S(this, sndSubscription);
+
+                var fstSubscription = _parent._first.SubscribeSafe(fstO);
+                sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
+
+                return StableCompositeDisposable.Create(fstSubscription, sndSubscription);
+            }
+
+            class F : IObserver<TFirst>
+            {
+                private readonly _ _parent;
+
+                public F(_ parent)
+                {
+                    _parent = parent;
+                }
+
+                public void OnCompleted()
+                {
+                    lock (_parent._gate)
+                    {
+                        _parent._observer.OnCompleted();
+                        _parent.Dispose();
+                    }
+                }
+
+                public void OnError(Exception error)
+                {
+                    lock (_parent._gate)
+                    {
+                        _parent._observer.OnError(error);
+                        _parent.Dispose();
+                    }
+                }
+
+                public void OnNext(TFirst value)
+                {
+                    lock (_parent._gate)
+                    {
+                        if (_parent._hasLatest)
+                        {
+                            var res = default(TResult);
+
+                            try
+                            {
+                                res = _parent._parent._resultSelector(value, _parent._latest);
+                            }
+                            catch (Exception ex)
+                            {
+                                _parent._observer.OnError(ex);
+                                _parent.Dispose();
+                                return;
+                            }
+
+                            _parent._observer.OnNext(res);
+                        }
+                    }
+                }
+            }
+
+            class S : IObserver<TSecond>
+            {
+                private readonly _ _parent;
+                private readonly IDisposable _self;
+
+                public S(_ parent, IDisposable self)
+                {
+                    _parent = parent;
+                    _self = self;
+                }
+
+                public void OnCompleted()
+                {
+                    _self.Dispose();
+                }
+
+                public void OnError(Exception error)
+                {
+                    lock (_parent._gate)
+                    {
+                        _parent._observer.OnError(error);
+                        _parent.Dispose();
+                    }
+                }
+
+                public void OnNext(TSecond value)
+                {
+                    lock (_parent._gate)
+                    {
+                        _parent._latest = value;
+                        _parent._hasLatest = true;
+                    }
+                }
+            }
+        }
+    }
+}
+#endif

+ 9 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Multiple.cs

@@ -1354,6 +1354,15 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + WithLatestFrom +
+
+        public virtual IObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
+        {
+            return new WithLatestFrom<TFirst, TSecond, TResult>(first, second, resultSelector);
+        }
+
+        #endregion
+
         #region + Zip +
 
         public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)

+ 1 - 0
Rx.NET/Source/System.Reactive.Linq/System.Reactive.Linq.csproj

@@ -63,6 +63,7 @@
     <Compile Include="Reactive\Linq\Observable\RefCount.cs" />
     <Compile Include="Reactive\Linq\Observable\Multicast.cs" />
     <Compile Include="Reactive\Linq\Observable\GroupBy.cs" />
+    <Compile Include="Reactive\Linq\Observable\WithLatestFrom.cs" />
     <Compile Include="Reactive\Linq\Observable_.cs" />
     <Compile Include="Reactive\Linq\QueryLanguage_.cs" />
     <Compile Include="Reactive\Linq\QueryLanguage.Joins.cs" />

+ 1 - 1
Rx.NET/Source/System.Reactive.Observable.Aliases/Qbservable.Aliases.Generated.cs

@@ -1,5 +1,5 @@
 /*
- * WARNING: Auto-generated file (4/16/2015 23:47:35)
+ * WARNING: Auto-generated file (5/1/2015 21:21:20)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 

+ 57 - 21
Rx.NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs

@@ -1,5 +1,5 @@
 /*
- * WARNING: Auto-generated file (4/16/2015 23:47:35)
+ * WARNING: Auto-generated file (5/1/2015 21:21:20)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 
@@ -12451,19 +12451,19 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
-        /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
-        /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
+        /// <param name="taskSelector">A transform function to apply to each element.</param>
+        /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12476,7 +12476,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -12490,19 +12490,19 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
-        /// <param name="taskSelector">A transform function to apply to each element.</param>
-        /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
+        /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+        /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12515,7 +12515,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -12529,19 +12529,19 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
-        /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
-        /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
+        /// <param name="taskSelector">A transform function to apply to each element.</param>
+        /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12554,7 +12554,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -12568,19 +12568,19 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
-        /// <param name="taskSelector">A transform function to apply to each element.</param>
-        /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
+        /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+        /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12593,7 +12593,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -17396,6 +17396,42 @@ namespace System.Reactive.Linq
             );
         }
         
+        /// <summary>
+        /// Merges two observable sequences into one observable sequence by combining each element from the first source with the latest element from the second source, if any.
+        /// </summary>
+        /// <typeparam name="TFirst">The type of the elements in the first source sequence.</typeparam>
+        /// <typeparam name="TSecond">The type of the elements in the second source sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the result sequence, returned by the selector function.</typeparam>
+        /// <param name="first">First observable source.</param>
+        /// <param name="second">Second observable source.</param>
+        /// <param name="resultSelector">Function to invoke for each element from the first source combined with the latest element from the second source, if any.</param>
+        /// <returns>An observable sequence containing the result of combining each element of the first source with the latest element from the second source, if any, using the specified result selector function.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="first" /> or <paramref name="second" /> or <paramref name="resultSelector" /> is null.</exception>
+        public static IQbservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IQbservable<TFirst> first, IObservable<TSecond> second, Expression<Func<TFirst, TSecond, TResult>> resultSelector)
+        {
+            if (first == null)
+                throw new ArgumentNullException("first");
+            if (second == null)
+                throw new ArgumentNullException("second");
+            if (resultSelector == null)
+                throw new ArgumentNullException("resultSelector");
+            
+            return first.Provider.CreateQuery<TResult>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.WithLatestFrom<TFirst, TSecond, TResult>(default(IQbservable<TFirst>), default(IObservable<TSecond>), default(Expression<Func<TFirst, TSecond, TResult>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TFirst), typeof(TSecond), typeof(TResult)),
+#endif
+                    first.Expression,
+                    GetSourceExpression(second),
+                    resultSelector
+                )
+            );
+        }
+        
         /// <summary>
         /// Merges the specified observable sequences into one observable sequence by emitting a list with the elements of the observable sequences at corresponding indexes.
         /// </summary>

+ 295 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableMultipleTest.cs

@@ -11172,6 +11172,301 @@ namespace ReactiveTests.Tests
 
         #endregion
 
+        #region + WithLatestFrom +
+
+        [TestMethod]
+        public void WithLatestFrom_ArgumentChecking()
+        {
+            var someObservable = DummyObservable<int>.Instance;
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.WithLatestFrom<int, int, int>(someObservable, someObservable, null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.WithLatestFrom<int, int, int>(null, someObservable, (_, __) => 0));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.WithLatestFrom<int, int, int>(someObservable, default(IObservable<int>), (_, __) => 0));
+        }
+
+        [TestMethod]
+        public void WithLatestFrom_Simple1()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(90, 1),
+                OnNext(180, 2),
+                OnNext(250, 3),
+                OnNext(260, 4),
+                OnNext(310, 5),
+                OnNext(340, 6),
+                OnNext(410, 7),
+                OnNext(420, 8),
+                OnNext(470, 9),
+                OnNext(550, 10),
+                OnCompleted<int>(590)
+            );
+
+            var ys = scheduler.CreateHotObservable(
+                OnNext(255, "bar"),
+                OnNext(330, "foo"),
+                OnNext(350, "qux"),
+                OnCompleted<string>(400)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.WithLatestFrom(ys, (x, y) => x + y)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(260, "4bar"),
+                OnNext(310, "5bar"),
+                OnNext(340, "6foo"),
+                OnNext(410, "7qux"),
+                OnNext(420, "8qux"),
+                OnNext(470, "9qux"),
+                OnNext(550, "10qux"),
+                OnCompleted<string>(590)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 590)
+            );
+
+            ys.Subscriptions.AssertEqual(
+                Subscribe(200, 400)
+            );
+        }
+
+        [TestMethod]
+        public void WithLatestFrom_Simple2()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(90, 1),
+                OnNext(180, 2),
+                OnNext(250, 3),
+                OnNext(260, 4),
+                OnNext(310, 5),
+                OnNext(340, 6),
+                OnCompleted<int>(390)
+            );
+
+            var ys = scheduler.CreateHotObservable(
+                OnNext(255, "bar"),
+                OnNext(330, "foo"),
+                OnNext(350, "qux"),
+                OnNext(370, "baz"),
+                OnCompleted<string>(400)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.WithLatestFrom(ys, (x, y) => x + y)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(260, "4bar"),
+                OnNext(310, "5bar"),
+                OnNext(340, "6foo"),
+                OnCompleted<string>(390)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 390)
+            );
+
+            ys.Subscriptions.AssertEqual(
+                Subscribe(200, 390)
+            );
+        }
+
+        [TestMethod]
+        public void WithLatestFrom_Simple3()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(90, 1),
+                OnNext(180, 2),
+                OnNext(250, 3),
+                OnNext(260, 4),
+                OnNext(310, 5),
+                OnNext(340, 6),
+                OnCompleted<int>(390)
+            );
+
+            var ys = scheduler.CreateHotObservable(
+                OnNext(245, "bar"),
+                OnNext(330, "foo"),
+                OnNext(350, "qux"),
+                OnNext(370, "baz"),
+                OnCompleted<string>(400)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.WithLatestFrom(ys, (x, y) => x + y)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(250, "3bar"),
+                OnNext(260, "4bar"),
+                OnNext(310, "5bar"),
+                OnNext(340, "6foo"),
+                OnCompleted<string>(390)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 390)
+            );
+
+            ys.Subscriptions.AssertEqual(
+                Subscribe(200, 390)
+            );
+        }
+
+        [TestMethod]
+        public void WithLatestFrom_Error1()
+        {
+            var scheduler = new TestScheduler();
+
+            var ex = new Exception();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(90, 1),
+                OnNext(180, 2),
+                OnNext(250, 3),
+                OnNext(260, 4),
+                OnNext(310, 5),
+                OnNext(340, 6),
+                OnNext(410, 7),
+                OnNext(420, 8),
+                OnNext(470, 9),
+                OnNext(550, 10),
+                OnError<int>(590, ex)
+            );
+
+            var ys = scheduler.CreateHotObservable(
+                OnNext(255, "bar"),
+                OnNext(330, "foo"),
+                OnNext(350, "qux"),
+                OnCompleted<string>(400)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.WithLatestFrom(ys, (x, y) => x + y)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(260, "4bar"),
+                OnNext(310, "5bar"),
+                OnNext(340, "6foo"),
+                OnNext(410, "7qux"),
+                OnNext(420, "8qux"),
+                OnNext(470, "9qux"),
+                OnNext(550, "10qux"),
+                OnError<string>(590, ex)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 590)
+            );
+
+            ys.Subscriptions.AssertEqual(
+                Subscribe(200, 400)
+            );
+        }
+
+        [TestMethod]
+        public void WithLatestFrom_Error2()
+        {
+            var scheduler = new TestScheduler();
+
+            var ex = new Exception();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(90, 1),
+                OnNext(180, 2),
+                OnNext(250, 3),
+                OnNext(260, 4),
+                OnNext(310, 5),
+                OnNext(340, 6),
+                OnCompleted<int>(390)
+            );
+
+            var ys = scheduler.CreateHotObservable(
+                OnNext(255, "bar"),
+                OnNext(330, "foo"),
+                OnNext(350, "qux"),
+                OnError<string>(370, ex)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.WithLatestFrom(ys, (x, y) => x + y)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(260, "4bar"),
+                OnNext(310, "5bar"),
+                OnNext(340, "6foo"),
+                OnError<string>(370, ex)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 370)
+            );
+
+            ys.Subscriptions.AssertEqual(
+                Subscribe(200, 370)
+            );
+        }
+
+        [TestMethod]
+        public void WithLatestFrom_Error3()
+        {
+            var scheduler = new TestScheduler();
+
+            var ex = new Exception();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(90, 1),
+                OnNext(180, 2),
+                OnNext(250, 3),
+                OnNext(260, 4),
+                OnNext(310, 5),
+                OnNext(340, 6),
+                OnCompleted<int>(390)
+            );
+
+            var ys = scheduler.CreateHotObservable(
+                OnNext(255, "bar"),
+                OnNext(330, "foo"),
+                OnNext(350, "qux"),
+                OnCompleted<string>(400)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.WithLatestFrom(ys, (x, y) =>
+                {
+                    if (x == 5)
+                        throw ex;
+
+                    return x + y;
+                })
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(260, "4bar"),
+                OnError<string>(310, ex)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 310)
+            );
+
+            ys.Subscriptions.AssertEqual(
+                Subscribe(200, 310)
+            );
+        }
+
+        #endregion
+
         #region + Zip +
 
         #region ArgumentChecking

+ 14 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/QbservableTest.cs

@@ -1499,6 +1499,20 @@ namespace ReactiveTests.Tests
             _qbp.While(() => true, _qbMy);
         }
 
+        [TestMethod]
+        public void WithLatestFrom_ArgumentNullChecks()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => Qbservable.WithLatestFrom(_qbNull, _qbMy, (a, b) => a + b));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Qbservable.WithLatestFrom(_qbMy, _qbNull, (a, b) => a + b));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Qbservable.WithLatestFrom(_qbMy, _qbMy, default(Expression<Func<int, int, int>>)));
+        }
+
+        [TestMethod]
+        public void WithLatestFrom()
+        {
+            _qbMy.WithLatestFrom(_qbMy, (a, b) => a + b);
+        }
+
         [TestMethod]
         public void Zip_ArgumentNullChecks()
         {