Browse Source

Implementing more Append overloads.

Bart De Smet 8 năm trước cách đây
mục cha
commit
60fb0abeb9

+ 88 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Append.cs

@@ -193,7 +193,30 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            var d = new SingleAssignmentAsyncDisposable();
+
+            return
+                (
+                    Create<TSource>(
+                        observer.OnNextAsync,
+                        observer.OnErrorAsync,
+                        async () =>
+                        {
+                            var task = await scheduler.ScheduleAsync(async ct =>
+                            {
+                                for (var i = 0; i < values.Length && !ct.IsCancellationRequested; i++)
+                                {
+                                    await observer.OnNextAsync(values[i]).RendezVous(scheduler, ct);
+                                }
+
+                                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
+                            }).ConfigureAwait(false);
+
+                            await d.AssignAsync(task).ConfigureAwait(false);
+                        }
+                    ),
+                    d
+                );
         }
 
         public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, IEnumerable<TSource> values)
@@ -227,7 +250,70 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            var d = new SingleAssignmentAsyncDisposable();
+
+            return
+                (
+                    Create<TSource>(
+                        observer.OnNextAsync,
+                        observer.OnErrorAsync,
+                        async () =>
+                        {
+                            var task = await scheduler.ScheduleAsync(async ct =>
+                            {
+                                var e = default(IEnumerator<TSource>);
+
+                                try
+                                {
+                                    e = values.GetEnumerator();
+                                }
+                                catch (Exception ex)
+                                {
+                                    await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
+                                    return;
+                                }
+
+                                using (e)
+                                {
+                                    while (!ct.IsCancellationRequested)
+                                    {
+                                        var b = default(bool);
+                                        var value = default(TSource);
+
+                                        try
+                                        {
+                                            b = e.MoveNext();
+
+                                            if (b)
+                                            {
+                                                value = e.Current;
+                                            }
+                                        }
+                                        catch (Exception ex)
+                                        {
+                                            await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
+                                            return;
+                                        }
+
+                                        if (b)
+                                        {
+                                            await observer.OnNextAsync(value).RendezVous(scheduler, ct);
+                                        }
+                                        else
+                                        {
+                                            break;
+                                        }
+                                    }
+                                }
+
+                                await observer.OnCompletedAsync().RendezVous(scheduler, ct);
+                            }).ConfigureAwait(false);
+
+                            await d.AssignAsync(task).ConfigureAwait(false);
+                        }
+                    ),
+                    d
+                );
         }
     }
 }