Browse Source

Fixing deadlock in ToAsyncEnumerable + reducing memory cost.

Bart De Smet 10 years ago
parent
commit
3fba59915a
1 changed files with 81 additions and 70 deletions
  1. 81 70
      Ix.NET/Source/System.Interactive.Async/AsyncEnumerable.Conversions.cs

+ 81 - 70
Ix.NET/Source/System.Interactive.Async/AsyncEnumerable.Conversions.cs

@@ -106,36 +106,48 @@ namespace System.Linq
             return Create(() =>
             {
                 var observer = new ToAsyncEnumerableObserver<TSource>();
-                observer.Queue = new Queue<Either<TSource, Exception, bool>>();
 
                 var subscription = source.Subscribe(observer);
 
                 return Create(
                     (ct, tcs) =>
                     {
-                        lock (observer.Queue)
+                        var hasValue = false;
+                        var hasCompleted = false;
+                        var error = default(Exception);
+
+                        lock (observer.SyncRoot)
                         {
-                            if (observer.Queue.Count > 0)
+                            if (observer.Values.Count > 0)
+                            {
+                                hasValue = true;
+                                observer.Current = observer.Values.Dequeue();
+                            }
+                            else if (observer.HasCompleted)
+                            {
+                                hasCompleted = true;
+                            }
+                            else if (observer.Error != null)
                             {
-                                var n = observer.Queue.Dequeue();
-                                n.Switch(
-                                    x =>
-                                    {
-                                        observer.Current = x;
-                                        tcs.TrySetResult(true);
-                                    },
-                                    ex =>
-                                    {
-                                        tcs.TrySetException(ex);
-                                    },
-                                    _ =>
-                                    {
-                                        tcs.TrySetResult(false);
-                                    }
-                                );
+                                error = observer.Error;
                             }
                             else
+                            {
                                 observer.TaskCompletionSource = tcs;
+                            }
+                        }
+
+                        if (hasValue)
+                        {
+                            tcs.TrySetResult(true);
+                        }
+                        else if (hasCompleted)
+                        {
+                            tcs.TrySetResult(false);
+                        }
+                        else if (error != null)
+                        {
+                            tcs.TrySetException(error);
                         }
 
                         return tcs.Task;
@@ -151,88 +163,87 @@ namespace System.Linq
 
         class ToAsyncEnumerableObserver<T> : IObserver<T>
         {
-            public Queue<Either<T, Exception, bool>> Queue { get; set; }
-            public T Current { get; set; }
-            public TaskCompletionSource<bool> TaskCompletionSource { get; set; }
+            public ToAsyncEnumerableObserver()
+            {
+                Values = new Queue<T>();
+            }
+
+            public object SyncRoot
+            {
+                get { return Values; }
+            }
+
+            public readonly Queue<T> Values;
+            public Exception Error;
+            public bool HasCompleted;
+
+            public T Current;
+            public TaskCompletionSource<bool> TaskCompletionSource;
 
             public void OnCompleted()
             {
-                lock (Queue)
+                var tcs = default(TaskCompletionSource<bool>);
+
+                lock (SyncRoot)
                 {
-                    if (TaskCompletionSource == null)
-                        Queue.Enqueue(new Either<T, Exception, bool>.Choice3(true));
-                    else
+                    HasCompleted = true;
+
+                    if (TaskCompletionSource != null)
                     {
-                        TaskCompletionSource.SetResult(false);
+                        tcs = TaskCompletionSource;
                         TaskCompletionSource = null;
                     }
                 }
+
+                if (tcs != null)
+                {
+                    tcs.SetResult(false);
+                }
             }
 
             public void OnError(Exception error)
             {
-                lock (Queue)
+                var tcs = default(TaskCompletionSource<bool>);
+
+                lock (SyncRoot)
                 {
-                    if (TaskCompletionSource == null)
-                        Queue.Enqueue(new Either<T, Exception, bool>.Choice2(error));
-                    else
+                    Error = error;
+
+                    if (TaskCompletionSource != null)
                     {
-                        TaskCompletionSource.SetException(error);
+                        tcs = TaskCompletionSource;
                         TaskCompletionSource = null;
                     }
                 }
+
+                if (tcs != null)
+                {
+                    tcs.SetException(error);
+                }
             }
 
             public void OnNext(T value)
             {
-                lock (Queue)
+                var tcs = default(TaskCompletionSource<bool>);
+
+                lock (SyncRoot)
                 {
                     if (TaskCompletionSource == null)
-                        Queue.Enqueue(new Either<T, Exception, bool>.Choice1(value));
+                    {
+                        Values.Enqueue(value);
+                    }
                     else
                     {
                         Current = value;
-                        TaskCompletionSource.SetResult(true);
+
+                        tcs = TaskCompletionSource;
                         TaskCompletionSource = null;
                     }
                 }
-            }
-        }
-
-        abstract class Either<T, U, V>
-        {
-            public abstract void Switch(Action<T> choice1, Action<U> choice2, Action<V> choice3);
-
-            public class Choice1 : Either<T, U, V>
-            {
-                public Choice1(T value) { Value = value; }
-                public T Value { get; private set; }
-
-                public override void Switch(Action<T> choice1, Action<U> choice2, Action<V> choice3)
-                {
-                    choice1(Value);
-                }
-            }
-
-            public class Choice2 : Either<T, U, V>
-            {
-                public Choice2(U value) { Value = value; }
-                public U Value { get; private set; }
-
-                public override void Switch(Action<T> choice1, Action<U> choice2, Action<V> choice3)
-                {
-                    choice2(Value);
-                }
-            }
-
-            public class Choice3 : Either<T, U, V>
-            {
-                public Choice3(V value) { Value = value; }
-                public V Value { get; private set; }
 
-                public override void Switch(Action<T> choice1, Action<U> choice2, Action<V> choice3)
+                if (tcs != null)
                 {
-                    choice3(Value);
+                    tcs.SetResult(true);
                 }
             }
         }