Bläddra i källkod

Adding some Buffer overloads.

Bart De Smet 8 år sedan
förälder
incheckning
3be5e7e394

+ 205 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Buffer.cs

@@ -149,6 +149,43 @@ namespace System.Reactive.Linq
                 return StableCompositeAsyncDisposable.Create(subscription, timer);
                 return StableCompositeAsyncDisposable.Create(subscription, timer);
             });
             });
         }
         }
+
+        public static IAsyncObservable<IList<TSource>> Buffer<TSource, TBufferBoundary>(this IAsyncObservable<TSource> source, IAsyncObservable<TBufferBoundary> bufferBoundaries)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (bufferBoundaries == null)
+                throw new ArgumentNullException(nameof(bufferBoundaries));
+
+            return Create<IList<TSource>>(async observer =>
+            {
+                var (sourceObserver, boundariesObserver) = AsyncObserver.Buffer<TSource, TBufferBoundary>(observer);
+
+                var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
+                var boundariesSubscription = await bufferBoundaries.SubscribeSafeAsync(boundariesObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(sourceSubscription, boundariesSubscription);
+            });
+        }
+
+        // REVIEW: This overload is inherited from Rx but arguably a bit esoteric as it doesn't provide context to the closing selector.
+
+        public static IAsyncObservable<IList<TSource>> Buffer<TSource, TBufferClosing>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TBufferClosing>> bufferClosingSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (bufferClosingSelector == null)
+                throw new ArgumentNullException(nameof(bufferClosingSelector));
+
+            return Create<IList<TSource>>(async observer =>
+            {
+                var (sourceObserver, closingDisposable) = await AsyncObserver.Buffer<TSource, TBufferClosing>(observer, bufferClosingSelector).ConfigureAwait(false);
+
+                var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(sourceSubscription, closingDisposable);
+            });
+        }
     }
     }
 
 
     partial class AsyncObserver
     partial class AsyncObserver
@@ -522,5 +559,173 @@ namespace System.Reactive.Linq
                 return (sink, timer);
                 return (sink, timer);
             };
             };
         }
         }
+
+        public static (IAsyncObserver<TSource>, IAsyncObserver<TBufferBoundary>) Buffer<TSource, TBufferBoundary>(IAsyncObserver<IList<TSource>> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var gate = new AsyncLock();
+
+            var buffer = new List<TSource>();
+
+            return
+                (
+                    Create<TSource>(
+                        async x =>
+                        {
+                            using (await gate.LockAsync().ConfigureAwait(false))
+                            {
+                                buffer.Add(x);
+                            }
+                        },
+                        async ex =>
+                        {
+                            using (await gate.LockAsync().ConfigureAwait(false))
+                            {
+                                buffer.Clear();
+                                await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            }
+                        },
+                        async () =>
+                        {
+                            using (await gate.LockAsync().ConfigureAwait(false))
+                            {
+                                await observer.OnNextAsync(buffer).ConfigureAwait(false);
+                                await observer.OnCompletedAsync().ConfigureAwait(false);
+                            }
+                        }
+                    ),
+                    Create<TBufferBoundary>(
+                        async x =>
+                        {
+                            using (await gate.LockAsync().ConfigureAwait(false))
+                            {
+                                var oldBuffer = buffer;
+                                buffer = new List<TSource>();
+                                await observer.OnNextAsync(oldBuffer).ConfigureAwait(false);
+                            }
+                        },
+                        async ex =>
+                        {
+                            using (await gate.LockAsync().ConfigureAwait(false))
+                            {
+                                buffer.Clear();
+                                await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            }
+                        },
+                        async () =>
+                        {
+                            using (await gate.LockAsync().ConfigureAwait(false))
+                            {
+                                await observer.OnNextAsync(buffer).ConfigureAwait(false);
+                                await observer.OnCompletedAsync().ConfigureAwait(false);
+                            }
+                        }
+                    )
+                );
+        }
+
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource, TBufferClosing>(IAsyncObserver<IList<TSource>> observer, Func<IAsyncObservable<TBufferClosing>> bufferClosingSelector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (bufferClosingSelector == null)
+                throw new ArgumentNullException(nameof(bufferClosingSelector));
+
+            async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
+            {
+                var closeSubscription = new SerialAsyncDisposable();
+
+                var gate = new AsyncLock();
+
+                var buffer = new List<TSource>();
+
+                async Task CreateBufferCloseAsync()
+                {
+                    var closing = default(IAsyncObservable<TBufferClosing>);
+
+                    try
+                    {
+                        closing = bufferClosingSelector(); // REVIEW: Do we need an async variant?
+                    }
+                    catch (Exception ex)
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            buffer.Clear();
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                        }
+                    }
+
+                    var closingSubscription = new SingleAssignmentAsyncDisposable();
+                    await closeSubscription.AssignAsync(closingSubscription).ConfigureAwait(false);
+
+                    async Task CloseBufferAsync()
+                    {
+                        await closingSubscription.DisposeAsync().ConfigureAwait(false);
+
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            var oldBuffer = buffer;
+                            buffer = new List<TSource>();
+                            await observer.OnNextAsync(oldBuffer).ConfigureAwait(false);
+                        }
+
+                        await CreateBufferCloseAsync().ConfigureAwait(false); // TODO: Use a traditional "async lock" to get queue behavior.
+                    }
+
+                    var closingObserver =
+                        Create<TBufferClosing>(
+                            x => CloseBufferAsync(),
+                            async ex =>
+                            {
+                                using (await gate.LockAsync().ConfigureAwait(false))
+                                {
+                                    buffer.Clear();
+                                    await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                                }
+                            },
+                            CloseBufferAsync
+                        );
+
+                    var closingSubscriptionInner = await closing.SubscribeAsync(closingObserver).ConfigureAwait(false);
+                    await closingSubscription.AssignAsync(closingSubscriptionInner).ConfigureAwait(false);
+                }
+
+                var sink =
+                    Create<TSource>(
+                        async x =>
+                        {
+                            using (await gate.LockAsync().ConfigureAwait(false))
+                            {
+                                buffer.Add(x);
+                            }
+                        },
+                        async ex =>
+                        {
+                            using (await gate.LockAsync().ConfigureAwait(false))
+                            {
+                                buffer.Clear();
+                                await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            }
+                        },
+                        async () =>
+                        {
+                            using (await gate.LockAsync().ConfigureAwait(false))
+                            {
+                                await observer.OnNextAsync(buffer).ConfigureAwait(false);
+                                await observer.OnCompletedAsync().ConfigureAwait(false);
+                            }
+                        }
+                    );
+
+                await CreateBufferCloseAsync().ConfigureAwait(false);
+
+                return (sink, closeSubscription);
+            }
+
+            return CoreAsync();
+        }
     }
     }
 }
 }