Переглянути джерело

Using queue lock in Buffer.

Bart De Smet 8 роки тому
батько
коміт
beba3f6506

+ 3 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Buffer.cs

@@ -638,6 +638,7 @@ namespace System.Reactive.Linq
                 var closeSubscription = new SerialAsyncDisposable();
 
                 var gate = new AsyncLock();
+                var queueLock = new AsyncQueueLock();
 
                 var buffer = new List<TSource>();
 
@@ -672,7 +673,7 @@ namespace System.Reactive.Linq
                             await observer.OnNextAsync(oldBuffer).ConfigureAwait(false);
                         }
 
-                        await CreateBufferCloseAsync().ConfigureAwait(false); // TODO: Use a traditional "async lock" to get queue behavior.
+                        await queueLock.WaitAsync(CreateBufferCloseAsync).ConfigureAwait(false);
                     }
 
                     var closingObserver =
@@ -720,7 +721,7 @@ namespace System.Reactive.Linq
                         }
                     );
 
-                await CreateBufferCloseAsync().ConfigureAwait(false);
+                await queueLock.WaitAsync(CreateBufferCloseAsync).ConfigureAwait(false);
 
                 return (sink, closeSubscription);
             }

+ 1 - 1
AsyncRx.NET/System.Reactive.Bcl/System/Threading/AsyncQueueLock.cs

@@ -15,7 +15,7 @@ namespace System.Threading
         private bool _isAcquired;
         private bool _hasFaulted;
 
-        public async Task Wait(Func<Task> action)
+        public async Task WaitAsync(Func<Task> action)
         {
             if (action == null)
                 throw new ArgumentNullException(nameof(action));