|
@@ -3,77 +3,11 @@
|
|
|
// See the LICENSE file in the project root for more information.
|
|
|
|
|
|
using System.Collections.Generic;
|
|
|
-using System.Threading.Tasks;
|
|
|
|
|
|
namespace System.Linq
|
|
|
{
|
|
|
public static partial class AsyncEnumerable
|
|
|
{
|
|
|
- public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this IObservable<TSource> source)
|
|
|
- {
|
|
|
- if (source == null)
|
|
|
- throw new ArgumentNullException(nameof(source));
|
|
|
-
|
|
|
- return CreateEnumerable(
|
|
|
- () =>
|
|
|
- {
|
|
|
- var observer = new ToAsyncEnumerableObserver<TSource>();
|
|
|
-
|
|
|
- var subscription = source.Subscribe(observer);
|
|
|
-
|
|
|
- return CreateEnumerator(
|
|
|
- tcs =>
|
|
|
- {
|
|
|
- var hasValue = false;
|
|
|
- var hasCompleted = false;
|
|
|
- var error = default(Exception);
|
|
|
-
|
|
|
- lock (observer.SyncRoot)
|
|
|
- {
|
|
|
- if (observer.Values.Count > 0)
|
|
|
- {
|
|
|
- hasValue = true;
|
|
|
- observer.Current = observer.Values.Dequeue();
|
|
|
- }
|
|
|
- else if (observer.HasCompleted)
|
|
|
- {
|
|
|
- hasCompleted = true;
|
|
|
- }
|
|
|
- else if (observer.Error != null)
|
|
|
- {
|
|
|
- error = observer.Error;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- observer.TaskCompletionSource = tcs;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (hasValue)
|
|
|
- {
|
|
|
- tcs.TrySetResult(true);
|
|
|
- }
|
|
|
- else if (hasCompleted)
|
|
|
- {
|
|
|
- tcs.TrySetResult(false);
|
|
|
- }
|
|
|
- else if (error != null)
|
|
|
- {
|
|
|
- tcs.TrySetException(error);
|
|
|
- }
|
|
|
-
|
|
|
- return tcs.Task;
|
|
|
- },
|
|
|
- () => observer.Current,
|
|
|
- () =>
|
|
|
- {
|
|
|
- subscription.Dispose();
|
|
|
- // Should we cancel in-flight operations somehow?
|
|
|
- return TaskExt.True;
|
|
|
- });
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
public static IObservable<TSource> ToObservable<TSource>(this IAsyncEnumerable<TSource> source)
|
|
|
{
|
|
|
if (source == null)
|
|
@@ -82,84 +16,6 @@ namespace System.Linq
|
|
|
return new ToObservableObservable<TSource>(source);
|
|
|
}
|
|
|
|
|
|
- private sealed class ToAsyncEnumerableObserver<T> : IObserver<T>
|
|
|
- {
|
|
|
- public readonly Queue<T> Values;
|
|
|
-
|
|
|
- public T Current;
|
|
|
- public Exception Error;
|
|
|
- public bool HasCompleted;
|
|
|
- public TaskCompletionSource<bool> TaskCompletionSource;
|
|
|
-
|
|
|
- public ToAsyncEnumerableObserver()
|
|
|
- {
|
|
|
- Values = new Queue<T>();
|
|
|
- }
|
|
|
-
|
|
|
- public object SyncRoot
|
|
|
- {
|
|
|
- get { return Values; }
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
- {
|
|
|
- var tcs = default(TaskCompletionSource<bool>);
|
|
|
-
|
|
|
- lock (SyncRoot)
|
|
|
- {
|
|
|
- HasCompleted = true;
|
|
|
-
|
|
|
- if (TaskCompletionSource != null)
|
|
|
- {
|
|
|
- tcs = TaskCompletionSource;
|
|
|
- TaskCompletionSource = null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- tcs?.TrySetResult(false);
|
|
|
- }
|
|
|
-
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- var tcs = default(TaskCompletionSource<bool>);
|
|
|
-
|
|
|
- lock (SyncRoot)
|
|
|
- {
|
|
|
- Error = error;
|
|
|
-
|
|
|
- if (TaskCompletionSource != null)
|
|
|
- {
|
|
|
- tcs = TaskCompletionSource;
|
|
|
- TaskCompletionSource = null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- tcs?.TrySetException(error);
|
|
|
- }
|
|
|
-
|
|
|
- public void OnNext(T value)
|
|
|
- {
|
|
|
- var tcs = default(TaskCompletionSource<bool>);
|
|
|
-
|
|
|
- lock (SyncRoot)
|
|
|
- {
|
|
|
- if (TaskCompletionSource == null)
|
|
|
- {
|
|
|
- Values.Enqueue(value);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- Current = value;
|
|
|
-
|
|
|
- tcs = TaskCompletionSource;
|
|
|
- TaskCompletionSource = null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- tcs?.TrySetResult(true);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private sealed class ToObservableObservable<T> : IObservable<T>
|
|
|
{
|
|
|
private readonly IAsyncEnumerable<T> source;
|
|
@@ -175,38 +31,40 @@ namespace System.Linq
|
|
|
var e = source.GetAsyncEnumerator();
|
|
|
|
|
|
var f = default(Action);
|
|
|
- f = () => e.MoveNextAsync()
|
|
|
- .ContinueWith(async t =>
|
|
|
- {
|
|
|
- if (t.IsFaulted)
|
|
|
- {
|
|
|
- observer.OnError(t.Exception);
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
- else if (t.IsCanceled)
|
|
|
- {
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
- else if (t.IsCompleted)
|
|
|
- {
|
|
|
- if (t.Result)
|
|
|
- {
|
|
|
- observer.OnNext(e.Current);
|
|
|
+ f = () => e.MoveNextAsync().ContinueWith(
|
|
|
+ async t =>
|
|
|
+ {
|
|
|
+ if (t.IsFaulted)
|
|
|
+ {
|
|
|
+ observer.OnError(t.Exception);
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ else if (t.IsCanceled)
|
|
|
+ {
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ else if (t.IsCompleted)
|
|
|
+ {
|
|
|
+ if (t.Result)
|
|
|
+ {
|
|
|
+ observer.OnNext(e.Current);
|
|
|
|
|
|
- if (!ctd.Token.IsCancellationRequested)
|
|
|
- f();
|
|
|
-
|
|
|
- //In case cancellation is requested, this could only have happened
|
|
|
- //by disposing the returned composite disposable (see below).
|
|
|
- //In that case, e will be disposed too, so there is no need to dispose e here.
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- observer.OnCompleted();
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
- }
|
|
|
- }, ctd.Token);
|
|
|
+ if (!ctd.Token.IsCancellationRequested)
|
|
|
+ {
|
|
|
+ f();
|
|
|
+ }
|
|
|
+
|
|
|
+ // In case cancellation is requested, this could only have happened
|
|
|
+ // by disposing the returned composite disposable (see below).
|
|
|
+ // In that case, e will be disposed too, so there is no need to dispose e here.
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ observer.OnCompleted();
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, ctd.Token);
|
|
|
|
|
|
f();
|
|
|
|