浏览代码

Implement AsyncObserverBase<T> for real.

Bart De Smet 8 年之前
父节点
当前提交
ee3bca15d3
共有 1 个文件被更改,包括 50 次插入3 次删除
  1. 50 3
      AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/AsyncObserverBase.cs

+ 50 - 3
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/AsyncObserverBase.cs

@@ -2,15 +2,31 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
     public abstract class AsyncObserverBase<T> : IAsyncObserver<T>
     {
+        private const int Idle = 0;
+        private const int Busy = 1;
+        private const int Done = 2;
+
+        private int _status = Idle;
+
         public Task OnCompletedAsync()
         {
-            throw new NotImplementedException();
+            TryEnter();
+
+            try
+            {
+                return OnCompletedAsyncCore();
+            }
+            finally
+            {
+                Interlocked.Exchange(ref _status, Done);
+            }
         }
 
         protected abstract Task OnCompletedAsyncCore();
@@ -20,16 +36,47 @@ namespace System.Reactive.Linq
             if (error == null)
                 throw new ArgumentNullException(nameof(error));
 
-            throw new NotImplementedException();
+            TryEnter();
+
+            try
+            {
+                return OnErrorAsyncCore(error);
+            }
+            finally
+            {
+                Interlocked.Exchange(ref _status, Done);
+            }
         }
 
         protected abstract Task OnErrorAsyncCore(Exception error);
 
         public Task OnNextAsync(T value)
         {
-            throw new NotImplementedException();
+            TryEnter();
+
+            try
+            {
+                return OnNextAsync(value);
+            }
+            finally
+            {
+                Interlocked.Exchange(ref _status, Idle);
+            }
         }
 
         protected abstract Task OnNextAsyncCore(T value);
+
+        private void TryEnter()
+        {
+            var old = Interlocked.CompareExchange(ref _status, Busy, Idle);
+
+            switch (old)
+            {
+                case Busy:
+                    throw new InvalidOperationException("The observer is currently processing a notification.");
+                case Done:
+                    throw new InvalidOperationException("The observer has already terminated.");
+            }
+        }
     }
 }