Sfoglia il codice sorgente

Implementing scheduler rendez-vous.

Bart De Smet 8 anni fa
parent
commit
6a782aec57
22 ha cambiato i file con 329 aggiunte e 190 eliminazioni
  1. 22 1
      AsyncRx.NET/Playground/Program.cs
  2. 126 8
      AsyncRx.NET/System.Reactive.Async.Concurrency/System/Reactive/Concurrency/AsyncScheduler.cs
  3. 3 3
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Buffer.cs
  4. 4 4
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Delay.cs
  5. 1 1
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Empty.cs
  6. 22 22
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Generate.cs
  7. 4 4
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ObserveOn.cs
  8. 2 2
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Range.cs
  9. 3 3
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Repeat.cs
  10. 2 2
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Return.cs
  11. 1 1
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Skip.cs
  12. 1 1
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/SkipUntil.cs
  13. 3 3
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/SubscribeOn.cs
  14. 2 2
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Take.cs
  15. 4 4
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/TakeLast.cs
  16. 2 2
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/TakeUntil.cs
  17. 1 1
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Throw.cs
  18. 2 2
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Timeout.cs
  19. 8 8
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Timer.cs
  20. 102 102
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToAsync.Generated.cs
  21. 6 6
      AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToAsync.Generated.tt
  22. 8 8
      AsyncRx.NET/System.Reactive.Async.Linq/System/Threading/Tasks/TaskAsyncObservableExtensions.cs

+ 22 - 1
AsyncRx.NET/Playground/Program.cs

@@ -173,7 +173,7 @@ namespace Playground
 
         static async Task RangeAsync()
         {
-            await AsyncObservable.Range(0, 10).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
+            await AsyncObservable.Range(0, 10).SubscribeAsync(PrintAsync<int>()); // TODO: Use ForEachAsync.
         }
 
         static async Task ReplaySubjectAsync()
@@ -267,5 +267,26 @@ namespace Playground
                 }
             );
         }
+
+        static IAsyncObserver<T> PrintAsync<T>()
+        {
+            return AsyncObserver.Create<T>(
+                async x =>
+                {
+                    await Task.Yield();
+                    Console.WriteLine(x);
+                },
+                async ex =>
+                {
+                    await Task.Yield();
+                    Console.WriteLine("Error: " + ex);
+                },
+                async () =>
+                {
+                    await Task.Yield();
+                    Console.WriteLine("Completed");
+                }
+            );
+        }
     }
 }

+ 126 - 8
AsyncRx.NET/System.Reactive.Async.Concurrency/System/Reactive/Concurrency/AsyncScheduler.cs

@@ -11,8 +11,6 @@ namespace System.Reactive.Concurrency
 {
     public static class AsyncScheduler
     {
-        // TODO: Implement proper RendezVous semantics.
-
         public static IAwaitable RendezVous(this IAsyncScheduler scheduler)
         {
             if (scheduler == null)
@@ -31,24 +29,28 @@ namespace System.Reactive.Concurrency
             return new RendezVousAwaitable(scheduler, token);
         }
 
-        public static ConfiguredTaskAwaitable RendezVous(this Task task, IAsyncScheduler scheduler)
+        public static IAwaitable RendezVous(this Task task, IAsyncScheduler scheduler) => RendezVous(task, scheduler, CancellationToken.None);
+
+        public static IAwaitable RendezVous(this Task task, IAsyncScheduler scheduler, CancellationToken token)
         {
             if (task == null)
                 throw new ArgumentNullException(nameof(task));
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return task.ConfigureAwait(true);
+            return new TaskAwaitable(task, scheduler, token);
         }
 
-        public static ConfiguredTaskAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler scheduler)
+        public static IAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler scheduler) => RendezVous(task, scheduler, CancellationToken.None);
+
+        public static IAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler scheduler, CancellationToken token)
         {
             if (task == null)
                 throw new ArgumentNullException(nameof(task));
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return task.ConfigureAwait(true);
+            return new TaskAwaitable<T>(task, scheduler, token);
         }
 
         public static async Task Delay(this IAsyncScheduler scheduler, TimeSpan dueTime, CancellationToken token = default(CancellationToken))
@@ -115,7 +117,7 @@ namespace System.Reactive.Concurrency
                 {
                     ct.ThrowIfCancellationRequested();
 
-                    await action(ct).RendezVous(scheduler);
+                    await action(ct).RendezVous(scheduler, ct);
                 }
                 catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
                 {
@@ -159,7 +161,7 @@ namespace System.Reactive.Concurrency
                 {
                     ct.ThrowIfCancellationRequested();
 
-                    res = await action(ct).RendezVous(scheduler);
+                    res = await action(ct).RendezVous(scheduler, ct);
                 }
                 catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
                 {
@@ -243,5 +245,121 @@ namespace System.Reactive.Concurrency
                 }, _token);
             }
         }
+
+        private sealed class TaskAwaitable : IAwaitable, IAwaiter
+        {
+            private readonly TaskAwaiter _task;
+            private readonly IAsyncScheduler _scheduler;
+            private readonly CancellationToken _token;
+
+            public TaskAwaitable(Task task, IAsyncScheduler scheduler, CancellationToken token)
+            {
+                _task = task.GetAwaiter();
+                _scheduler = scheduler;
+                _token = token;
+            }
+
+            public bool IsCompleted => _task.IsCompleted;
+
+            public IAwaiter GetAwaiter() => this;
+
+            public void GetResult()
+            {
+                _token.ThrowIfCancellationRequested();
+
+                _task.GetResult();
+            }
+
+            public void OnCompleted(Action continuation)
+            {
+                var cancel = default(IDisposable);
+
+                if (_token.CanBeCanceled)
+                {
+                    cancel = _token.Register(() =>
+                    {
+                        Interlocked.Exchange(ref continuation, null)?.Invoke();
+                    });
+                }
+
+                try
+                {
+                    _task.OnCompleted(() =>
+                    {
+                        var t = _scheduler.ExecuteAsync(ct =>
+                        {
+                            cancel?.Dispose();
+
+                            Interlocked.Exchange(ref continuation, null)?.Invoke();
+
+                            return Task.CompletedTask;
+                        }, _token);
+                    });
+                }
+                catch
+                {
+                    cancel?.Dispose();
+                    throw;
+                }
+            }
+        }
+
+        private sealed class TaskAwaitable<T> : IAwaitable<T>, IAwaiter<T>
+        {
+            private readonly TaskAwaiter<T> _task;
+            private readonly IAsyncScheduler _scheduler;
+            private readonly CancellationToken _token;
+
+            public TaskAwaitable(Task<T> task, IAsyncScheduler scheduler, CancellationToken token)
+            {
+                _task = task.GetAwaiter();
+                _scheduler = scheduler;
+                _token = token;
+            }
+
+            public bool IsCompleted => _task.IsCompleted;
+
+            public IAwaiter<T> GetAwaiter() => this;
+
+            public T GetResult()
+            {
+                _token.ThrowIfCancellationRequested();
+
+                return _task.GetResult();
+            }
+
+            public void OnCompleted(Action continuation)
+            {
+                var cancel = default(IDisposable);
+
+                if (_token.CanBeCanceled)
+                {
+                    cancel = _token.Register(() =>
+                    {
+                        Interlocked.Exchange(ref continuation, null)?.Invoke();
+                    });
+                }
+
+                try
+                {
+                    _task.OnCompleted(() =>
+                    {
+                        var t = _scheduler.ExecuteAsync(ct =>
+                        {
+                            cancel?.Dispose();
+
+                            Interlocked.Exchange(ref continuation, null)?.Invoke();
+
+                            return Task.CompletedTask;
+                        }, _token);
+                    });
+                }
+                catch
+                {
+                    cancel?.Dispose();
+                    throw;
+                }
+            }
+        }
     }
 }

+ 3 - 3
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Buffer.cs

@@ -332,7 +332,7 @@ namespace System.Reactive.Linq
                             }
                         }
 
-                        await scheduler.Delay(timeSpan, ct).RendezVous(scheduler);
+                        await scheduler.Delay(timeSpan, ct).RendezVous(scheduler, ct);
                     }
                 }, timeSpan);
 
@@ -453,7 +453,7 @@ namespace System.Reactive.Linq
 
                                 if (buffer.Count > 0)
                                 {
-                                    await observer.OnNextAsync(buffer).RendezVous(scheduler);
+                                    await observer.OnNextAsync(buffer).RendezVous(scheduler, ct);
                                 }
                             }
 
@@ -463,7 +463,7 @@ namespace System.Reactive.Linq
                             }
                         }
 
-                        await scheduler.Delay(GetNextDue(), ct).RendezVous(scheduler);
+                        await scheduler.Delay(GetNextDue(), ct).RendezVous(scheduler, ct);
                     }
                 }, GetNextDue());
 

+ 4 - 4
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Delay.cs

@@ -79,7 +79,7 @@ namespace System.Reactive.Linq
             {
                 while (!ct.IsCancellationRequested)
                 {
-                    await semaphore.WaitAsync(ct).RendezVous(scheduler);
+                    await semaphore.WaitAsync(ct).RendezVous(scheduler, ct);
 
                     if (queue.Count > 0)
                     {
@@ -88,14 +88,14 @@ namespace System.Reactive.Linq
                         var nextDueTime = start + next.Interval + dueTime;
                         var delay = nextDueTime - scheduler.Now;
 
-                        await scheduler.Delay(delay, ct).RendezVous(scheduler);
+                        await scheduler.Delay(delay, ct).RendezVous(scheduler, ct);
 
-                        await observer.OnNextAsync(next.Value).RendezVous(scheduler);
+                        await observer.OnNextAsync(next.Value).RendezVous(scheduler, ct);
                     }
 
                     if (queue.Count == 0 && isDone)
                     {
-                        await observer.OnCompletedAsync().RendezVous(scheduler);
+                        await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                         break;
                     }
                 }

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Empty.cs

@@ -38,7 +38,7 @@ namespace System.Reactive.Linq
             {
                 ct.ThrowIfCancellationRequested();
 
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
     }

+ 22 - 22
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Generate.cs

@@ -259,25 +259,25 @@ namespace System.Reactive.Linq
                         }
                         else
                         {
-                            state = await iterate(state).RendezVous(scheduler);
+                            state = await iterate(state).RendezVous(scheduler, ct);
                         }
 
-                        hasResult = await condition(state).RendezVous(scheduler);
+                        hasResult = await condition(state).RendezVous(scheduler, ct);
 
                         if (hasResult)
                         {
-                            result = await resultSelector(state).RendezVous(scheduler);
+                            result = await resultSelector(state).RendezVous(scheduler, ct);
                         }
                     }
                     catch (Exception ex)
                     {
-                        await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                        await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                         return;
                     }
 
                     if (hasResult)
                     {
-                        await observer.OnNextAsync(result).RendezVous(scheduler);
+                        await observer.OnNextAsync(result).RendezVous(scheduler, ct);
                     }
                     else
                     {
@@ -287,7 +287,7 @@ namespace System.Reactive.Linq
 
                 if (!ct.IsCancellationRequested)
                 {
-                    await observer.OnCompletedAsync().RendezVous(scheduler);
+                    await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                 }
             });
         }
@@ -376,38 +376,38 @@ namespace System.Reactive.Linq
                         }
                         else
                         {
-                            state = await iterate(state).RendezVous(scheduler);
+                            state = await iterate(state).RendezVous(scheduler, ct);
                         }
 
-                        hasResult = await condition(state).RendezVous(scheduler);
+                        hasResult = await condition(state).RendezVous(scheduler, ct);
 
                         if (hasResult)
                         {
-                            result = await resultSelector(state).RendezVous(scheduler);
-                            nextDue = await timeSelector(state).RendezVous(scheduler);
+                            result = await resultSelector(state).RendezVous(scheduler, ct);
+                            nextDue = await timeSelector(state).RendezVous(scheduler, ct);
                         }
                     }
                     catch (Exception ex)
                     {
-                        await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                        await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                         return;
                     }
 
                     if (hasResult)
                     {
-                        await observer.OnNextAsync(result).RendezVous(scheduler);
+                        await observer.OnNextAsync(result).RendezVous(scheduler, ct);
                     }
                     else
                     {
                         break;
                     }
 
-                    await scheduler.Delay(nextDue).RendezVous(scheduler);
+                    await scheduler.Delay(nextDue).RendezVous(scheduler, ct);
                 }
 
                 if (!ct.IsCancellationRequested)
                 {
-                    await observer.OnCompletedAsync().RendezVous(scheduler);
+                    await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                 }
             });
         }
@@ -496,38 +496,38 @@ namespace System.Reactive.Linq
                         }
                         else
                         {
-                            state = await iterate(state).RendezVous(scheduler);
+                            state = await iterate(state).RendezVous(scheduler, ct);
                         }
 
-                        hasResult = await condition(state).RendezVous(scheduler);
+                        hasResult = await condition(state).RendezVous(scheduler, ct);
 
                         if (hasResult)
                         {
-                            result = await resultSelector(state).RendezVous(scheduler);
-                            nextDue = await timeSelector(state).RendezVous(scheduler);
+                            result = await resultSelector(state).RendezVous(scheduler, ct);
+                            nextDue = await timeSelector(state).RendezVous(scheduler, ct);
                         }
                     }
                     catch (Exception ex)
                     {
-                        await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                        await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                         return;
                     }
 
                     if (hasResult)
                     {
-                        await observer.OnNextAsync(result).RendezVous(scheduler);
+                        await observer.OnNextAsync(result).RendezVous(scheduler, ct);
                     }
                     else
                     {
                         break;
                     }
 
-                    await scheduler.Delay(nextDue).RendezVous(scheduler);
+                    await scheduler.Delay(nextDue).RendezVous(scheduler, ct);
                 }
 
                 if (!ct.IsCancellationRequested)
                 {
-                    await observer.OnCompletedAsync().RendezVous(scheduler);
+                    await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                 }
             });
         }

+ 4 - 4
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ObserveOn.cs

@@ -51,26 +51,26 @@ namespace System.Reactive.Linq
             {
                 while (!ct.IsCancellationRequested)
                 {
-                    await semaphore.WaitAsync(ct).RendezVous(scheduler);
+                    await semaphore.WaitAsync(ct).RendezVous(scheduler, ct);
 
                     if (queue.Count > 0)
                     {
                         var next = queue.Dequeue();
 
-                        await observer.OnNextAsync(next).RendezVous(scheduler);
+                        await observer.OnNextAsync(next).RendezVous(scheduler, ct);
                     }
 
                     if (queue.Count == 0)
                     {
                         if (isDone)
                         {
-                            await observer.OnCompletedAsync().RendezVous(scheduler);
+                            await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                             break;
                         }
 
                         if (error != null)
                         {
-                            await observer.OnErrorAsync(error).RendezVous(scheduler);
+                            await observer.OnErrorAsync(error).RendezVous(scheduler, ct);
                             break;
                         }
                     }

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Range.cs

@@ -47,12 +47,12 @@ namespace System.Reactive.Linq
 
                 for (int i = start, end = start + count - 1; i <= end && !ct.IsCancellationRequested; i++)
                 {
-                    await observer.OnNextAsync(i).RendezVous(scheduler);
+                    await observer.OnNextAsync(i).RendezVous(scheduler, ct);
                 }
 
                 ct.ThrowIfCancellationRequested();
 
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
     }

+ 3 - 3
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Repeat.cs

@@ -83,7 +83,7 @@ namespace System.Reactive.Linq
             {
                 while (!ct.IsCancellationRequested)
                 {
-                    await observer.OnNextAsync(value).RendezVous(scheduler);
+                    await observer.OnNextAsync(value).RendezVous(scheduler, ct);
                 }
             });
         }
@@ -113,14 +113,14 @@ namespace System.Reactive.Linq
 
                 while (!ct.IsCancellationRequested && i < repeatCount)
                 {
-                    await observer.OnNextAsync(value).RendezVous(scheduler);
+                    await observer.OnNextAsync(value).RendezVous(scheduler, ct);
 
                     i++;
                 }
 
                 if (i == repeatCount)
                 {
-                    await observer.OnCompletedAsync().RendezVous(scheduler);
+                    await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                 }
             });
         }

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Return.cs

@@ -38,11 +38,11 @@ namespace System.Reactive.Linq
             {
                 ct.ThrowIfCancellationRequested();
 
-                await observer.OnNextAsync(value).RendezVous(scheduler);
+                await observer.OnNextAsync(value).RendezVous(scheduler, ct);
 
                 ct.ThrowIfCancellationRequested();
 
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
     }

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Skip.cs

@@ -156,7 +156,7 @@ namespace System.Reactive.Linq
                         {
                             ct.ThrowIfCancellationRequested();
 
-                            using (await gate.LockAsync().RendezVous(scheduler))
+                            using (await gate.LockAsync().RendezVous(scheduler, ct))
                             {
                                 open = true;
                             }

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/SkipUntil.cs

@@ -178,7 +178,7 @@ namespace System.Reactive.Linq
                         {
                             ct.ThrowIfCancellationRequested();
 
-                            using (await gate.LockAsync().RendezVous(scheduler))
+                            using (await gate.LockAsync().RendezVous(scheduler, ct))
                             {
                                 open = true;
                             }

+ 3 - 3
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/SubscribeOn.cs

@@ -27,17 +27,17 @@ namespace System.Reactive.Linq
 
                 var scheduled = await scheduler.ScheduleAsync(async ct =>
                 {
-                    var subscription = await source.SubscribeSafeAsync(observer).RendezVous(scheduler);
+                    var subscription = await source.SubscribeSafeAsync(observer).RendezVous(scheduler, ct);
 
                     var scheduledDispose = AsyncDisposable.Create(async () =>
                     {
                         await scheduler.ScheduleAsync(async _ =>
                         {
-                            await subscription.DisposeAsync().RendezVous(scheduler);
+                            await subscription.DisposeAsync().RendezVous(scheduler, ct);
                         }).ConfigureAwait(false);
                     });
 
-                    await d.AssignAsync(scheduledDispose).RendezVous(scheduler);
+                    await d.AssignAsync(scheduledDispose).RendezVous(scheduler, ct);
                 }).ConfigureAwait(false);
 
                 await m.AssignAsync(scheduled).ConfigureAwait(false);

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Take.cs

@@ -130,9 +130,9 @@ namespace System.Reactive.Linq
                         {
                             ct.ThrowIfCancellationRequested();
 
-                            using (await gate.LockAsync().RendezVous(scheduler))
+                            using (await gate.LockAsync().RendezVous(scheduler, ct))
                             {
-                                await observer.OnCompletedAsync().RendezVous(scheduler);
+                                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                             }
                         }, duration)
                     );

+ 4 - 4
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/TakeLast.cs

@@ -170,12 +170,12 @@ namespace System.Reactive.Linq
                             {
                                 while (!ct.IsCancellationRequested && queue.Count > 0)
                                 {
-                                    await observer.OnNextAsync(queue.Dequeue()).RendezVous(scheduler);
+                                    await observer.OnNextAsync(queue.Dequeue()).RendezVous(scheduler, ct);
                                 }
 
                                 ct.ThrowIfCancellationRequested();
 
-                                await observer.OnCompletedAsync().RendezVous(scheduler);
+                                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                             }).ConfigureAwait(false);
 
                             await sad.AssignAsync(drain).ConfigureAwait(false);
@@ -234,12 +234,12 @@ namespace System.Reactive.Linq
                             {
                                 while (!ct.IsCancellationRequested && queue.Count > 0)
                                 {
-                                    await observer.OnNextAsync(queue.Dequeue().Value).RendezVous(scheduler);
+                                    await observer.OnNextAsync(queue.Dequeue().Value).RendezVous(scheduler, ct);
                                 }
 
                                 ct.ThrowIfCancellationRequested();
 
-                                await observer.OnCompletedAsync().RendezVous(scheduler);
+                                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                             }).ConfigureAwait(false);
 
                             await sad.AssignAsync(drain).ConfigureAwait(false);

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/TakeUntil.cs

@@ -148,9 +148,9 @@ namespace System.Reactive.Linq
                         {
                             ct.ThrowIfCancellationRequested();
 
-                            using (await gate.LockAsync().RendezVous(scheduler))
+                            using (await gate.LockAsync().RendezVous(scheduler, ct))
                             {
-                                await observer.OnCompletedAsync().RendezVous(scheduler);
+                                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                             }
                         }, endTime)
                     );

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Throw.cs

@@ -43,7 +43,7 @@ namespace System.Reactive.Linq
             {
                 ct.ThrowIfCancellationRequested();
 
-                await observer.OnErrorAsync(error).RendezVous(scheduler);
+                await observer.OnErrorAsync(error).RendezVous(scheduler, ct);
             });
         }
     }

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Timeout.cs

@@ -192,9 +192,9 @@ namespace System.Reactive.Linq
 
                         if (hasWon)
                         {
-                            var otherSubscription = await other.SubscribeSafeAsync(observer).RendezVous(scheduler);
+                            var otherSubscription = await other.SubscribeSafeAsync(observer).RendezVous(scheduler, ct);
 
-                            await subscription.AssignAsync(otherSubscription).RendezVous(scheduler);
+                            await subscription.AssignAsync(otherSubscription).RendezVous(scheduler, ct);
                         }
                     }, dueTime).ConfigureAwait(false);
 

+ 8 - 8
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Timer.cs

@@ -85,11 +85,11 @@ namespace System.Reactive.Linq
             {
                 ct.ThrowIfCancellationRequested();
 
-                await observer.OnNextAsync(0L).RendezVous(scheduler);
+                await observer.OnNextAsync(0L).RendezVous(scheduler, ct);
 
                 ct.ThrowIfCancellationRequested();
 
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             }, dueTime);
 
         }
@@ -105,11 +105,11 @@ namespace System.Reactive.Linq
             {
                 ct.ThrowIfCancellationRequested();
 
-                await observer.OnNextAsync(0L).RendezVous(scheduler);
+                await observer.OnNextAsync(0L).RendezVous(scheduler, ct);
 
                 ct.ThrowIfCancellationRequested();
 
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             }, dueTime);
         }
 
@@ -132,9 +132,9 @@ namespace System.Reactive.Linq
 
                 do
                 {
-                    await observer.OnNextAsync(tick++).RendezVous(scheduler);
+                    await observer.OnNextAsync(tick++).RendezVous(scheduler, ct);
 
-                    await scheduler.Delay(period, ct).RendezVous(scheduler);
+                    await scheduler.Delay(period, ct).RendezVous(scheduler, ct);
                 } while (!ct.IsCancellationRequested);
 
                 ct.ThrowIfCancellationRequested();
@@ -160,9 +160,9 @@ namespace System.Reactive.Linq
 
                 do
                 {
-                    await observer.OnNextAsync(tick++).RendezVous(scheduler);
+                    await observer.OnNextAsync(tick++).RendezVous(scheduler, ct);
 
-                    await scheduler.Delay(period, ct).RendezVous(scheduler);
+                    await scheduler.Delay(period, ct).RendezVous(scheduler, ct);
                 } while (!ct.IsCancellationRequested);
 
                 ct.ThrowIfCancellationRequested();

+ 102 - 102
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToAsync.Generated.cs

@@ -894,12 +894,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -931,12 +931,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -968,12 +968,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1005,12 +1005,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1042,12 +1042,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1079,12 +1079,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1116,12 +1116,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1153,12 +1153,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1190,12 +1190,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1227,12 +1227,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1264,12 +1264,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1301,12 +1301,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1338,12 +1338,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1375,12 +1375,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1412,12 +1412,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1449,12 +1449,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1486,12 +1486,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1522,12 +1522,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1558,12 +1558,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1594,12 +1594,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1630,12 +1630,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1666,12 +1666,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1702,12 +1702,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1738,12 +1738,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1774,12 +1774,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1810,12 +1810,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1846,12 +1846,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1882,12 +1882,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1918,12 +1918,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1954,12 +1954,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -1990,12 +1990,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -2026,12 +2026,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -2062,12 +2062,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -2098,12 +2098,12 @@ namespace System.Reactive.Linq
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 

+ 6 - 6
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToAsync.Generated.tt

@@ -167,12 +167,12 @@ for (var i = 0; i <= 16; i++)
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(res).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(res).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 
@@ -231,12 +231,12 @@ for (var i = 0; i <= 16; i++)
                 }
                 catch (Exception ex)
                 {
-                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
                     return;
                 }
 
-                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                await observer.OnCompletedAsync().RendezVous(scheduler);
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
             });
         }
 

+ 8 - 8
AsyncRx.NET/System.Reactive.Async.Linq/System/Threading/Tasks/TaskAsyncObservableExtensions.cs

@@ -72,14 +72,14 @@ namespace System.Threading.Tasks
                     switch (task.Status)
                     {
                         case TaskStatus.RanToCompletion:
-                            await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                            await observer.OnCompletedAsync().RendezVous(scheduler);
+                            await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
+                            await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                             break;
                         case TaskStatus.Faulted:
-                            await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler);
+                            await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler, ct);
                             break;
                         case TaskStatus.Canceled:
-                            await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler);
+                            await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler, ct);
                             break;
                     }
                 });
@@ -134,14 +134,14 @@ namespace System.Threading.Tasks
                     switch (task.Status)
                     {
                         case TaskStatus.RanToCompletion:
-                            await observer.OnNextAsync(task.Result).RendezVous(scheduler);
-                            await observer.OnCompletedAsync().RendezVous(scheduler);
+                            await observer.OnNextAsync(task.Result).RendezVous(scheduler, ct);
+                            await observer.OnCompletedAsync().RendezVous(scheduler, ct);
                             break;
                         case TaskStatus.Faulted:
-                            await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler);
+                            await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler, ct);
                             break;
                         case TaskStatus.Canceled:
-                            await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler);
+                            await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler, ct);
                             break;
                     }
                 });