فهرست منبع

Remove IObservable<T> conversion to its own file.

Bart De Smet 7 سال پیش
والد
کامیت
f08c7f063e

+ 160 - 0
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.Observable.cs

@@ -0,0 +1,160 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace System.Linq
+{
+    public static partial class AsyncEnumerable
+    {
+        public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this IObservable<TSource> source)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+
+            return CreateEnumerable(
+                ct =>
+                {
+                    var observer = new ToAsyncEnumerableObserver<TSource>();
+
+                    var subscription = source.Subscribe(observer);
+
+                    // REVIEW: Review possible concurrency issues with Dispose calls.
+
+                    var ctr = ct.Register(subscription.Dispose);
+
+                    return AsyncEnumerator.Create(
+                        tcs =>
+                        {
+                            var hasValue = false;
+                            var hasCompleted = false;
+                            var error = default(Exception);
+
+                            lock (observer.SyncRoot)
+                            {
+                                if (observer.Values.Count > 0)
+                                {
+                                    hasValue = true;
+                                    observer.Current = observer.Values.Dequeue();
+                                }
+                                else if (observer.HasCompleted)
+                                {
+                                    hasCompleted = true;
+                                }
+                                else if (observer.Error != null)
+                                {
+                                    error = observer.Error;
+                                }
+                                else
+                                {
+                                    observer.TaskCompletionSource = tcs;
+                                }
+                            }
+
+                            if (hasValue)
+                            {
+                                tcs.TrySetResult(true);
+                            }
+                            else if (hasCompleted)
+                            {
+                                tcs.TrySetResult(false);
+                            }
+                            else if (error != null)
+                            {
+                                tcs.TrySetException(error);
+                            }
+
+                            return new ValueTask<bool>(tcs.Task);
+                        },
+                        () => observer.Current,
+                        () =>
+                        {
+                            ctr.Dispose();
+                            subscription.Dispose();
+                            // Should we cancel in-flight operations somehow?
+                            return default;
+                        });
+                });
+        }
+
+        private sealed class ToAsyncEnumerableObserver<T> : IObserver<T>
+        {
+            public readonly Queue<T> Values;
+
+            public T Current;
+            public Exception Error;
+            public bool HasCompleted;
+            public TaskCompletionSource<bool> TaskCompletionSource;
+
+            public ToAsyncEnumerableObserver()
+            {
+                Values = new Queue<T>();
+            }
+
+            public object SyncRoot
+            {
+                get { return Values; }
+            }
+
+            public void OnCompleted()
+            {
+                var tcs = default(TaskCompletionSource<bool>);
+
+                lock (SyncRoot)
+                {
+                    HasCompleted = true;
+
+                    if (TaskCompletionSource != null)
+                    {
+                        tcs = TaskCompletionSource;
+                        TaskCompletionSource = null;
+                    }
+                }
+
+                tcs?.TrySetResult(false);
+            }
+
+            public void OnError(Exception error)
+            {
+                var tcs = default(TaskCompletionSource<bool>);
+
+                lock (SyncRoot)
+                {
+                    Error = error;
+
+                    if (TaskCompletionSource != null)
+                    {
+                        tcs = TaskCompletionSource;
+                        TaskCompletionSource = null;
+                    }
+                }
+
+                tcs?.TrySetException(error);
+            }
+
+            public void OnNext(T value)
+            {
+                var tcs = default(TaskCompletionSource<bool>);
+
+                lock (SyncRoot)
+                {
+                    if (TaskCompletionSource == null)
+                    {
+                        Values.Enqueue(value);
+                    }
+                    else
+                    {
+                        Current = value;
+
+                        tcs = TaskCompletionSource;
+                        TaskCompletionSource = null;
+                    }
+                }
+
+                tcs?.TrySetResult(true);
+            }
+        }
+    }
+}

+ 0 - 148
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.cs

@@ -29,76 +29,6 @@ namespace System.Linq
             return new AsyncEnumerableAdapter<TSource>(source);
         }
 
-        public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this IObservable<TSource> source)
-        {
-            if (source == null)
-                throw Error.ArgumentNull(nameof(source));
-
-            return CreateEnumerable(
-                ct =>
-                {
-                    var observer = new ToAsyncEnumerableObserver<TSource>();
-
-                    var subscription = source.Subscribe(observer);
-
-                    // REVIEW: Review possible concurrency issues with Dispose calls.
-
-                    var ctr = ct.Register(subscription.Dispose);
-
-                    return AsyncEnumerator.Create(
-                        tcs =>
-                        {
-                            var hasValue = false;
-                            var hasCompleted = false;
-                            var error = default(Exception);
-
-                            lock (observer.SyncRoot)
-                            {
-                                if (observer.Values.Count > 0)
-                                {
-                                    hasValue = true;
-                                    observer.Current = observer.Values.Dequeue();
-                                }
-                                else if (observer.HasCompleted)
-                                {
-                                    hasCompleted = true;
-                                }
-                                else if (observer.Error != null)
-                                {
-                                    error = observer.Error;
-                                }
-                                else
-                                {
-                                    observer.TaskCompletionSource = tcs;
-                                }
-                            }
-
-                            if (hasValue)
-                            {
-                                tcs.TrySetResult(true);
-                            }
-                            else if (hasCompleted)
-                            {
-                                tcs.TrySetResult(false);
-                            }
-                            else if (error != null)
-                            {
-                                tcs.TrySetException(error);
-                            }
-
-                            return new ValueTask<bool>(tcs.Task);
-                        },
-                        () => observer.Current,
-                        () =>
-                        {
-                            ctr.Dispose();
-                            subscription.Dispose();
-                            // Should we cancel in-flight operations somehow?
-                            return default;
-                        });
-                });
-        }
-
         private sealed class AsyncEnumerableAdapter<T> : AsyncIterator<T>, IAsyncIListProvider<T>
         {
             private readonly IEnumerable<T> _source;
@@ -359,83 +289,5 @@ namespace System.Linq
 
             bool ICollection<T>.IsReadOnly => _source.IsReadOnly;
         }
-
-        private sealed class ToAsyncEnumerableObserver<T> : IObserver<T>
-        {
-            public readonly Queue<T> Values;
-
-            public T Current;
-            public Exception Error;
-            public bool HasCompleted;
-            public TaskCompletionSource<bool> TaskCompletionSource;
-
-            public ToAsyncEnumerableObserver()
-            {
-                Values = new Queue<T>();
-            }
-
-            public object SyncRoot
-            {
-                get { return Values; }
-            }
-
-            public void OnCompleted()
-            {
-                var tcs = default(TaskCompletionSource<bool>);
-
-                lock (SyncRoot)
-                {
-                    HasCompleted = true;
-
-                    if (TaskCompletionSource != null)
-                    {
-                        tcs = TaskCompletionSource;
-                        TaskCompletionSource = null;
-                    }
-                }
-
-                tcs?.TrySetResult(false);
-            }
-
-            public void OnError(Exception error)
-            {
-                var tcs = default(TaskCompletionSource<bool>);
-
-                lock (SyncRoot)
-                {
-                    Error = error;
-
-                    if (TaskCompletionSource != null)
-                    {
-                        tcs = TaskCompletionSource;
-                        TaskCompletionSource = null;
-                    }
-                }
-
-                tcs?.TrySetException(error);
-            }
-
-            public void OnNext(T value)
-            {
-                var tcs = default(TaskCompletionSource<bool>);
-
-                lock (SyncRoot)
-                {
-                    if (TaskCompletionSource == null)
-                    {
-                        Values.Enqueue(value);
-                    }
-                    else
-                    {
-                        Current = value;
-
-                        tcs = TaskCompletionSource;
-                        TaskCompletionSource = null;
-                    }
-                }
-
-                tcs?.TrySetResult(true);
-            }
-        }
     }
 }